You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/08/08 17:28:42 UTC
hive git commit: HIVE-16758: Better Select Number of Replications
(BELUGA BEHR, reviewed by Chao Sun)
Repository: hive
Updated Branches:
refs/heads/master f778e8c50 -> f067df6f5
HIVE-16758: Better Select Number of Replications (BELUGA BEHR, reviewed by Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f067df6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f067df6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f067df6f
Branch: refs/heads/master
Commit: f067df6f5817429d52a96fb78cdbcfbe83c1f497
Parents: f778e8c
Author: BELUGA BEHR <da...@gmail.com>
Authored: Tue Aug 8 10:27:18 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Tue Aug 8 10:27:18 2017 -0700
----------------------------------------------------------------------
.../hive/ql/exec/SparkHashTableSinkOperator.java | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f067df6f/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index c3b1d0a..7c1b714 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -49,11 +49,13 @@ public class SparkHashTableSinkOperator
private final String CLASS_NAME = this.getClass().getName();
private final transient PerfLogger perfLogger = SessionState.getPerfLogger();
protected static final Logger LOG = LoggerFactory.getLogger(SparkHashTableSinkOperator.class.getName());
- public static final String DFS_REPLICATION_MAX = "dfs.replication.max";
- private int minReplication = 10;
+ private static final String MAPRED_FILE_REPLICATION = "mapreduce.client.submit.file.replication";
+ private static final int DEFAULT_REPLICATION = 10;
private final HashTableSinkOperator htsOperator;
+ private short numReplication;
+
/** Kryo ctor. */
protected SparkHashTableSinkOperator() {
super();
@@ -72,9 +74,7 @@ public class SparkHashTableSinkOperator
byte tag = conf.getTag();
inputOIs[tag] = inputObjInspectors[0];
conf.setTagOrder(new Byte[]{ tag });
- int dfsMaxReplication = hconf.getInt(DFS_REPLICATION_MAX, minReplication);
- // minReplication value should not cross the value of dfs.replication.max
- minReplication = Math.min(minReplication, dfsMaxReplication);
+ numReplication = (short) hconf.getInt(MAPRED_FILE_REPLICATION, DEFAULT_REPLICATION);
htsOperator.setConf(conf);
htsOperator.initialize(hconf, inputOIs);
}
@@ -136,7 +136,6 @@ public class SparkHashTableSinkOperator
String dumpFilePrefix = conf.getDumpFilePrefix();
Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
FileSystem fs = path.getFileSystem(htsOperator.getConfiguration());
- short replication = fs.getDefaultReplication(path);
fs.mkdirs(path); // Create the folder and its parents if not there
while (true) {
@@ -151,9 +150,7 @@ public class SparkHashTableSinkOperator
// No problem, use a new name
}
}
- // TODO find out numOfPartitions for the big table
- int numOfPartitions = replication;
- replication = (short) Math.max(minReplication, numOfPartitions);
+
htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
+ " with group count: " + tableContainer.size() + " into file: " + path);
try {
@@ -162,7 +159,7 @@ public class SparkHashTableSinkOperator
ObjectOutputStream out = null;
MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag];
try {
- os = fs.create(path, replication);
+ os = fs.create(path, numReplication);
out = new ObjectOutputStream(new BufferedOutputStream(os, 4096));
mapJoinTableSerde.persist(out, tableContainer);
} finally {