You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/08/07 23:58:33 UTC

hive git commit: HIVE-16895: Multi-threaded execution of bootstrap dump of partitions (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master 318822b18 -> 0e7ca91a9


HIVE-16895: Multi-threaded execution of bootstrap dump of partitions (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e7ca91a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e7ca91a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e7ca91a

Branch: refs/heads/master
Commit: 0e7ca91a9832efa5ab05c79b0e981735b89e3a72
Parents: 318822b
Author: Daniel Dai <da...@hortonworks.com>
Authored: Mon Aug 7 16:58:14 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Mon Aug 7 16:58:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   2 +-
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |   2 +-
 .../ql/parse/repl/dump/PartitionExport.java     | 116 +++++++++++++++++++
 .../hive/ql/parse/repl/dump/TableExport.java    |  22 ++--
 5 files changed, 131 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 62a65c9..c16880e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -442,6 +442,8 @@ public class HiveConf extends Configuration {
         "Inteval for cmroot cleanup thread."),
     REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
         "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
+    REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5,
+        "Number of threads that will be used to dump partition data information during repl dump."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),

http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index f3ba3e4..7501ed7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -235,7 +235,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
       TableExport.Paths exportPaths =
           new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf);
-      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
+      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).write();
       REPL_STATE_LOG.info(
           "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}",
           dbName, tblName, exportPaths.exportRootDir.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 0932dff..86575e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -74,7 +74,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
     TableExport.Paths exportPaths =
         new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf);
     TableExport.AuthEntities authEntities =
-        new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run();
+        new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).write();
     inputs.addAll(authEntities.inputs);
     outputs.addAll(authEntities.outputs);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
new file mode 100644
index 0000000..87beffa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -0,0 +1,116 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities;
+import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
+
+/**
+ * This class manages writing multiple partitions _data files simultaneously.
+ * it has a blocking queue that stores partitions to be dumped via a producer thread.
+ * it has a worker thread pool that reads of the queue to perform the various tasks.
+ */
+class PartitionExport {
+  private final Paths paths;
+  private final PartitionIterable partitionIterable;
+  private final HiveConf hiveConf;
+  private final int nThreads;
+  private final AuthEntities authEntities;
+
+  private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class);
+  private BlockingQueue<Partition> queue;
+
+  PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf hiveConf,
+      AuthEntities authEntities) {
+    this.paths = paths;
+    this.partitionIterable = partitionIterable;
+    this.hiveConf = hiveConf;
+    this.authEntities = authEntities;
+    this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
+    this.queue = new ArrayBlockingQueue<>(2 * nThreads);
+  }
+
+  void write(final ReplicationSpec forReplicationSpec) throws InterruptedException {
+    ExecutorService producer = Executors.newFixedThreadPool(1);
+    producer.submit(() -> {
+      for (Partition partition : partitionIterable) {
+        try {
+          queue.put(partition);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(
+              "Error while queuing up the partitions for export of data files", e);
+        }
+      }
+    });
+    producer.shutdown();
+
+    ThreadFactory namingThreadFactory =
+        new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build();
+    ExecutorService consumer = Executors.newFixedThreadPool(nThreads, namingThreadFactory);
+
+    while (!producer.isTerminated() || !queue.isEmpty()) {
+      /*
+      This is removed using a poll because there can be a case where there partitions iterator is empty
+      but because both the producer and consumer are started simultaneously the while loop will execute
+      because producer is not terminated but it wont produce anything so queue will be empty and then we
+      should only wait for a specific time before continuing, as the next loop cycle will fail.
+       */
+      Partition partition = queue.poll(1, TimeUnit.SECONDS);
+      if (partition == null) {
+        continue;
+      }
+      LOG.debug("scheduling partition dump {}", partition.getName());
+      consumer.submit(() -> {
+        String partitionName = partition.getName();
+        String threadName = Thread.currentThread().getName();
+        LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName);
+        Path fromPath = partition.getDataLocation();
+        try {
+          // this the data copy
+          Path rootDataDumpDir = paths.partitionExportDir(partitionName);
+          new FileOperations(fromPath, rootDataDumpDir, hiveConf).export(forReplicationSpec);
+          authEntities.inputs.add(new ReadEntity(partition));
+          LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
+        } catch (Exception e) {
+          throw new RuntimeException("Error while export of data files", e);
+        }
+      });
+    }
+    consumer.shutdown();
+    // may be drive this via configuration as well.
+    consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e7ca91a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 4d23efc..5d7fd25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -38,8 +37,10 @@ import org.slf4j.Logger;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
 
@@ -70,7 +71,7 @@ public class TableExport {
     this.paths = paths;
   }
 
-  public AuthEntities run() throws SemanticException {
+  public AuthEntities write() throws SemanticException {
     if (tableSpec == null) {
       writeMetaData(null);
     } else if (shouldExport()) {
@@ -139,13 +140,7 @@ public class TableExport {
           throw new IllegalStateException(
               "partitions cannot be null for partitionTable :" + tableSpec.tableName);
         }
-        for (Partition partition : partitions) {
-          Path fromPath = partition.getDataLocation();
-          // this the data copy
-          Path rootDataDumpDir = paths.partitionExportDir(partition.getName());
-          new FileOperations(fromPath, rootDataDumpDir, conf).export(replicationSpec);
-          authEntities.inputs.add(new ReadEntity(partition));
-        }
+        new PartitionExport(paths, partitions, conf, authEntities).write(replicationSpec);
       } else {
         Path fromPath = tableSpec.tableHandle.getDataLocation();
         //this is the data copy
@@ -210,7 +205,7 @@ public class TableExport {
       }
     }
 
-    private Path partitionExportDir(String partitionName) throws SemanticException {
+    Path partitionExportDir(String partitionName) throws SemanticException {
       return exportDir(new Path(exportRootDir, partitionName));
     }
 
@@ -271,7 +266,12 @@ public class TableExport {
   }
 
   public static class AuthEntities {
-    public final Set<ReadEntity> inputs = new HashSet<>();
+    /**
+     * This is  concurrent implementation as
+     * @see org.apache.hadoop.hive.ql.parse.repl.dump.PartitionExport
+     * uses multiple threads to flush out partitions.
+     */
+    public final Set<ReadEntity> inputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
     public final Set<WriteEntity> outputs = new HashSet<>();
   }
 }