You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/08/08 17:28:42 UTC

hive git commit: HIVE-16758: Better Select Number of Replications (BELUGA BEHR, reviewed by Chao Sun)

Repository: hive
Updated Branches:
  refs/heads/master f778e8c50 -> f067df6f5


HIVE-16758: Better Select Number of Replications (BELUGA BEHR, reviewed by Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f067df6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f067df6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f067df6f

Branch: refs/heads/master
Commit: f067df6f5817429d52a96fb78cdbcfbe83c1f497
Parents: f778e8c
Author: BELUGA BEHR <da...@gmail.com>
Authored: Tue Aug 8 10:27:18 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Tue Aug 8 10:27:18 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/SparkHashTableSinkOperator.java   | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f067df6f/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index c3b1d0a..7c1b714 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -49,11 +49,13 @@ public class SparkHashTableSinkOperator
   private final String CLASS_NAME = this.getClass().getName();
   private final transient PerfLogger perfLogger = SessionState.getPerfLogger();
   protected static final Logger LOG = LoggerFactory.getLogger(SparkHashTableSinkOperator.class.getName());
-  public static final String DFS_REPLICATION_MAX = "dfs.replication.max";
-  private int minReplication = 10;
+  private static final String MAPRED_FILE_REPLICATION = "mapreduce.client.submit.file.replication";
+  private static final int DEFAULT_REPLICATION = 10;
 
   private final HashTableSinkOperator htsOperator;
 
+  private short numReplication;
+
   /** Kryo ctor. */
   protected SparkHashTableSinkOperator() {
     super();
@@ -72,9 +74,7 @@ public class SparkHashTableSinkOperator
     byte tag = conf.getTag();
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
-    int dfsMaxReplication = hconf.getInt(DFS_REPLICATION_MAX, minReplication);
-    // minReplication value should not cross the value of dfs.replication.max
-    minReplication = Math.min(minReplication, dfsMaxReplication);
+    numReplication = (short) hconf.getInt(MAPRED_FILE_REPLICATION, DEFAULT_REPLICATION);
     htsOperator.setConf(conf);
     htsOperator.initialize(hconf, inputOIs);
   }
@@ -136,7 +136,6 @@ public class SparkHashTableSinkOperator
     String dumpFilePrefix = conf.getDumpFilePrefix();
     Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
     FileSystem fs = path.getFileSystem(htsOperator.getConfiguration());
-    short replication = fs.getDefaultReplication(path);
 
     fs.mkdirs(path);  // Create the folder and its parents if not there
     while (true) {
@@ -151,9 +150,7 @@ public class SparkHashTableSinkOperator
         // No problem, use a new name
       }
     }
-    // TODO find out numOfPartitions for the big table
-    int numOfPartitions = replication;
-    replication = (short) Math.max(minReplication, numOfPartitions);
+
     htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
       + " with group count: " + tableContainer.size() + " into file: " + path);
     try {
@@ -162,7 +159,7 @@ public class SparkHashTableSinkOperator
       ObjectOutputStream out = null;
       MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag];
       try {
-        os = fs.create(path, replication);
+        os = fs.create(path, numReplication);
         out = new ObjectOutputStream(new BufferedOutputStream(os, 4096));
         mapJoinTableSerde.persist(out, tableContainer);
       } finally {