You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/06/10 01:18:49 UTC

[accumulo] 01/02: Fix #1052 Correct distributed cache usage (#1112)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ecd98de9ff9a1a2a46849a6c3f26ba7468cdbac6
Author: Jeffrey L. Zeiberg <Je...@asrcfederal.com>
AuthorDate: Wed Apr 10 07:46:24 2019 -0400

    Fix #1052 Correct distributed cache usage (#1112)
    
    * Correct distributed cache usage for Tokenfile and Property and
      RangePartition key cut files.
---
 .../mapreduce/lib/partition/RangePartitioner.java  | 57 +++++++++++++---------
 .../clientImpl/mapreduce/lib/ConfiguratorBase.java | 22 +++++----
 .../mapreduce/lib/DistributedCacheHelper.java      |  1 -
 .../mapreduce/partition/RangePartitioner.java      | 57 +++++++++++++---------
 .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 24 ++++++---
 .../accumulo/test/mapreduce/TokenFileIT.java       | 30 +++++++++++-
 6 files changed, 129 insertions(+), 62 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
index 6b32130..8461ff3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
@@ -16,21 +16,22 @@
  */
 package org.apache.accumulo.core.client.mapreduce.lib.partition;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Scanner;
 import java.util.TreeSet;
 
+import javax.imageio.IIOException;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -90,26 +91,31 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf
       justification = "path provided by distributed cache framework, not user input")
   private synchronized Text[] getCutPoints() throws IOException {
     if (cutPointArray == null) {
+      Path path;
       String cutFileName = conf.get(CUTFILE_KEY);
-      Path[] cf = Job.getInstance().getLocalCacheFiles();
-
-      if (cf != null) {
-        for (Path path : cf) {
-          if (path.toUri().getPath()
-              .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) {
-            TreeSet<Text> cutPoints = new TreeSet<>();
-            try (Scanner in = new Scanner(new BufferedReader(
-                new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) {
-              while (in.hasNextLine())
-                cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
-            }
-            cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
-            break;
-          }
+      File tempFile = new File(CUTFILE_KEY);
+      if (tempFile.exists()) {
+        path = new Path(CUTFILE_KEY);
+      } else {
+        path = new Path(cutFileName);
+      }
+
+      if (path == null)
+        throw new FileNotFoundException("Cut point file not found in distributed cache");
+
+      TreeSet<Text> cutPoints = new TreeSet<>();
+      FileSystem fs = FileSystem.get(conf);
+      FSDataInputStream inputStream = fs.open(path);
+      try (Scanner in = new Scanner(inputStream)) {
+        while (in.hasNextLine()) {
+          cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
         }
       }
+
+      cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
+
       if (cutPointArray == null)
-        throw new FileNotFoundException(cutFileName + " not found in distributed cache");
+        throw new IIOException("Cutpoint array not properly created from file" + path.getName());
     }
     return cutPointArray;
   }
@@ -129,7 +135,14 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf
    * points that represent ranges for partitioning
    */
   public static void setSplitFile(Job job, String file) {
-    URI uri = new Path(file).toUri();
+    URI uri;
+    try {
+      uri = new URI(file + "#" + CUTFILE_KEY);
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException(
+          "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache.");
+    }
+
     job.addCacheFile(uri);
     job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
index acced30..bdbbdb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -62,6 +63,8 @@ public class ConfiguratorBase {
     IS_CONFIGURED, PRINCIPAL, TOKEN
   }
 
+  public static final String cachedFileName = "tokenfile";
+
   public enum TokenSource {
     FILE, INLINE, JOB;
 
@@ -189,7 +192,7 @@ public class ConfiguratorBase {
     checkArgument(tokenFile != null, "tokenFile is null");
 
     try {
-      DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf);
+      DistributedCacheHelper.addCacheFile(new URI(tokenFile + "#" + cachedFileName), conf);
     } catch (URISyntaxException e) {
       throw new IllegalStateException(
           "Unable to add tokenFile \"" + tokenFile + "\" to distributed cache.");
@@ -284,16 +287,17 @@ public class ConfiguratorBase {
       String tokenFile) {
     FSDataInputStream in = null;
     try {
-      URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
-      Path path = null;
-      for (URI u : uris) {
-        if (u.toString().equals(tokenFile)) {
-          path = new Path(u);
-        }
+      Path path;
+      // See if the "tokenfile" symlink was created and try to open the file it points to by it.
+      File tempFile = new File(ConfiguratorBase.cachedFileName);
+      if (tempFile.exists()) {
+        path = new Path(ConfiguratorBase.cachedFileName);
+      } else {
+        path = new Path(tokenFile);
       }
       if (path == null) {
-        throw new IllegalArgumentException(
-            "Couldn't find password file called \"" + tokenFile + "\" in cache.");
+        throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile
+            + "\" in the distributed cache or the specified path in the distributed filesystem.");
       }
       FileSystem fs = FileSystem.get(conf);
       in = fs.open(path);
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
index ead3c2a..49ddc74 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
@@ -41,5 +41,4 @@ public class DistributedCacheHelper {
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return org.apache.hadoop.filecache.DistributedCache.getCacheFiles(conf);
   }
-
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
index 7304904..7e18f5c 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
@@ -16,21 +16,22 @@
  */
 package org.apache.accumulo.hadoop.mapreduce.partition;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Scanner;
 import java.util.TreeSet;
 
+import javax.imageio.IIOException;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -88,26 +89,31 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf
       justification = "path provided by distributed cache framework, not user input")
   private synchronized Text[] getCutPoints() throws IOException {
     if (cutPointArray == null) {
+      Path path;
       String cutFileName = conf.get(CUTFILE_KEY);
-      Path[] cf = Job.getInstance().getLocalCacheFiles();
-
-      if (cf != null) {
-        for (Path path : cf) {
-          if (path.toUri().getPath()
-              .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) {
-            TreeSet<Text> cutPoints = new TreeSet<>();
-            try (Scanner in = new Scanner(new BufferedReader(
-                new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) {
-              while (in.hasNextLine())
-                cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
-            }
-            cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
-            break;
-          }
+      File tempFile = new File(CUTFILE_KEY);
+      if (tempFile.exists()) {
+        path = new Path(CUTFILE_KEY);
+      } else {
+        path = new Path(cutFileName);
+      }
+
+      if (path == null)
+        throw new FileNotFoundException("Cut point file not found in distributed cache");
+
+      TreeSet<Text> cutPoints = new TreeSet<>();
+      FileSystem fs = FileSystem.get(conf);
+      FSDataInputStream inputStream = fs.open(path);
+      try (Scanner in = new Scanner(inputStream)) {
+        while (in.hasNextLine()) {
+          cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
         }
       }
+
+      cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
+
       if (cutPointArray == null)
-        throw new FileNotFoundException(cutFileName + " not found in distributed cache");
+        throw new IIOException("Cutpoint array not properly created from file" + path.getName());
     }
     return cutPointArray;
   }
@@ -127,7 +133,14 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf
    * points that represent ranges for partitioning
    */
   public static void setSplitFile(Job job, String file) {
-    URI uri = new Path(file).toUri();
+    URI uri;
+    try {
+      uri = new URI(file + "#" + CUTFILE_KEY);
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException(
+          "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache.");
+    }
+
     job.addCacheFile(uri);
     job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
   }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
index 315ce2a..367046c 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
@@ -43,6 +44,8 @@ public class ConfiguratorBase {
     CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED
   }
 
+  public static final String clientPropsFileName = "propsfile";
+
   /**
    * Configuration keys for general configuration options.
    *
@@ -62,6 +65,7 @@ public class ConfiguratorBase {
    * @return the configuration key
    * @since 1.6.0
    */
+
   protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) {
     return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "."
         + StringUtils.camelize(e.name().toLowerCase());
@@ -83,7 +87,8 @@ public class ConfiguratorBase {
       Properties props, String clientPropsPath) {
     if (clientPropsPath != null) {
       try {
-        DistributedCacheHelper.addCacheFile(new URI(clientPropsPath), conf);
+        DistributedCacheHelper.addCacheFile(new URI(clientPropsPath + "#" + clientPropsFileName),
+            conf);
       } catch (URISyntaxException e) {
         throw new IllegalStateException("Unable to add client properties file \"" + clientPropsPath
             + "\" to distributed cache.");
@@ -107,13 +112,18 @@ public class ConfiguratorBase {
         conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), "");
     if (!clientPropsFile.isEmpty()) {
       try {
-        URI[] uris = DistributedCacheHelper.getCacheFiles(conf);
-        Path path = null;
-        for (URI u : uris) {
-          if (u.toString().equals(clientPropsFile)) {
-            path = new Path(u);
-          }
+        Path path;
+        // See if the "propsfile" symlink was created and try to open the file it points to by it.
+        File tempFile = new File(ConfiguratorBase.clientPropsFileName);
+        if (tempFile.exists()) {
+          path = new Path(ConfiguratorBase.clientPropsFileName);
+        } else {
+          path = new Path(clientPropsFile);
         }
+
+        if (path == null)
+          throw new IllegalStateException("Could not initialize properties file");
+
         FileSystem fs = FileSystem.get(conf);
         FSDataInputStream inputStream = fs.open(path);
         StringBuilder sb = new StringBuilder();
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
index c43b001..25a814a 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
@@ -24,8 +24,13 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -34,6 +39,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.Credentials;
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -84,6 +90,29 @@ public class TokenFileIT extends AccumuloClusterHarness {
         m.put("", "", Integer.toString(count));
         context.write(new Text(), m);
       }
+
+      @Override
+      protected void setup(Context context) throws IOException, InterruptedException {
+        if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
+          // At this point in the MapReduce Job you can get the cached files in HDFS if you want
+          URI[] cachedFiles = context.getCacheFiles();
+          // On the line below we access the file by the hdfs fragment name created during caching
+          // in ConfiguratorBase
+          String fileByPsuedonym = "";
+          fileByPsuedonym = getFileContents(ConfiguratorBase.cachedFileName);
+
+          assertTrue(!fileByPsuedonym.isEmpty());
+          assertTrue(cachedFiles.length > 0);
+        }
+        super.setup(context);
+      }
+
+      private String getFileContents(String filename) throws IOException {
+
+        Path filePath = Paths.get(filename);
+        return Files.lines(filePath).collect(Collectors.joining(System.lineSeparator()));
+      }
+
     }
 
     @Override
@@ -139,7 +168,6 @@ public class TokenFileIT extends AccumuloClusterHarness {
         return 1;
       }
     }
-
   }
 
   @Rule