You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/08/07 23:58:33 UTC
hive git commit: HIVE-16895: Multi-threaded execution of bootstrap
dump of partitions (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)
Repository: hive
Updated Branches:
refs/heads/master 318822b18 -> 0e7ca91a9
HIVE-16895: Multi-threaded execution of bootstrap dump of partitions (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e7ca91a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e7ca91a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e7ca91a
Branch: refs/heads/master
Commit: 0e7ca91a9832efa5ab05c79b0e981735b89e3a72
Parents: 318822b
Author: Daniel Dai <da...@hortonworks.com>
Authored: Mon Aug 7 16:58:14 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Mon Aug 7 16:58:14 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 2 +-
.../hive/ql/parse/ExportSemanticAnalyzer.java | 2 +-
.../ql/parse/repl/dump/PartitionExport.java | 116 +++++++++++++++++++
.../hive/ql/parse/repl/dump/TableExport.java | 22 ++--
5 files changed, 131 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 62a65c9..c16880e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -442,6 +442,8 @@ public class HiveConf extends Configuration {
"Inteval for cmroot cleanup thread."),
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
"Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
+ REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5,
+ "Number of threads that will be used to dump partition data information during repl dump."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index f3ba3e4..7501ed7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -235,7 +235,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
TableExport.Paths exportPaths =
new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf);
- new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
+ new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).write();
REPL_STATE_LOG.info(
"Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}",
dbName, tblName, exportPaths.exportRootDir.toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 0932dff..86575e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -74,7 +74,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
TableExport.Paths exportPaths =
new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf);
TableExport.AuthEntities authEntities =
- new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run();
+ new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).write();
inputs.addAll(authEntities.inputs);
outputs.addAll(authEntities.outputs);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
new file mode 100644
index 0000000..87beffa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -0,0 +1,116 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities;
+import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
+
+/**
+ * This class manages writing multiple partitions _data files simultaneously.
+ * it has a blocking queue that stores partitions to be dumped via a producer thread.
+ * it has a worker thread pool that reads of the queue to perform the various tasks.
+ */
+class PartitionExport {
+ private final Paths paths;
+ private final PartitionIterable partitionIterable;
+ private final HiveConf hiveConf;
+ private final int nThreads;
+ private final AuthEntities authEntities;
+
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class);
+ private BlockingQueue<Partition> queue;
+
+ PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf hiveConf,
+ AuthEntities authEntities) {
+ this.paths = paths;
+ this.partitionIterable = partitionIterable;
+ this.hiveConf = hiveConf;
+ this.authEntities = authEntities;
+ this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
+ this.queue = new ArrayBlockingQueue<>(2 * nThreads);
+ }
+
+ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException {
+ ExecutorService producer = Executors.newFixedThreadPool(1);
+ producer.submit(() -> {
+ for (Partition partition : partitionIterable) {
+ try {
+ queue.put(partition);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Error while queuing up the partitions for export of data files", e);
+ }
+ }
+ });
+ producer.shutdown();
+
+ ThreadFactory namingThreadFactory =
+ new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build();
+ ExecutorService consumer = Executors.newFixedThreadPool(nThreads, namingThreadFactory);
+
+ while (!producer.isTerminated() || !queue.isEmpty()) {
+ /*
+ This is removed using a poll because there can be a case where there partitions iterator is empty
+ but because both the producer and consumer are started simultaneously the while loop will execute
+ because producer is not terminated but it wont produce anything so queue will be empty and then we
+ should only wait for a specific time before continuing, as the next loop cycle will fail.
+ */
+ Partition partition = queue.poll(1, TimeUnit.SECONDS);
+ if (partition == null) {
+ continue;
+ }
+ LOG.debug("scheduling partition dump {}", partition.getName());
+ consumer.submit(() -> {
+ String partitionName = partition.getName();
+ String threadName = Thread.currentThread().getName();
+ LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName);
+ Path fromPath = partition.getDataLocation();
+ try {
+ // this the data copy
+ Path rootDataDumpDir = paths.partitionExportDir(partitionName);
+ new FileOperations(fromPath, rootDataDumpDir, hiveConf).export(forReplicationSpec);
+ authEntities.inputs.add(new ReadEntity(partition));
+ LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while export of data files", e);
+ }
+ });
+ }
+ consumer.shutdown();
+ // may be drive this via configuration as well.
+ consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 4d23efc..5d7fd25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -38,8 +37,10 @@ import org.slf4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
@@ -70,7 +71,7 @@ public class TableExport {
this.paths = paths;
}
- public AuthEntities run() throws SemanticException {
+ public AuthEntities write() throws SemanticException {
if (tableSpec == null) {
writeMetaData(null);
} else if (shouldExport()) {
@@ -139,13 +140,7 @@ public class TableExport {
throw new IllegalStateException(
"partitions cannot be null for partitionTable :" + tableSpec.tableName);
}
- for (Partition partition : partitions) {
- Path fromPath = partition.getDataLocation();
- // this the data copy
- Path rootDataDumpDir = paths.partitionExportDir(partition.getName());
- new FileOperations(fromPath, rootDataDumpDir, conf).export(replicationSpec);
- authEntities.inputs.add(new ReadEntity(partition));
- }
+ new PartitionExport(paths, partitions, conf, authEntities).write(replicationSpec);
} else {
Path fromPath = tableSpec.tableHandle.getDataLocation();
//this is the data copy
@@ -210,7 +205,7 @@ public class TableExport {
}
}
- private Path partitionExportDir(String partitionName) throws SemanticException {
+ Path partitionExportDir(String partitionName) throws SemanticException {
return exportDir(new Path(exportRootDir, partitionName));
}
@@ -271,7 +266,12 @@ public class TableExport {
}
public static class AuthEntities {
- public final Set<ReadEntity> inputs = new HashSet<>();
+ /**
+ * This is concurrent implementation as
+ * @see org.apache.hadoop.hive.ql.parse.repl.dump.PartitionExport
+ * uses multiple threads to flush out partitions.
+ */
+ public final Set<ReadEntity> inputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
public final Set<WriteEntity> outputs = new HashSet<>();
}
}