You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/03 20:03:34 UTC

svn commit: r1629275 - in /hive/branches/branch-0.14: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/

Author: gunther
Date: Fri Oct  3 18:03:33 2014
New Revision: 1629275

URL: http://svn.apache.org/r1629275
Log:
HIVE-7957: Revisit event version handling in dynamic partition pruning on Tez (Gunther Hagleitner, reviewed by Vikram Dixit K)

Modified:
    hive/branches/branch-0.14/pom.xml
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java

Modified: hive/branches/branch-0.14/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/pom.xml?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/pom.xml (original)
+++ hive/branches/branch-0.14/pom.xml Fri Oct  3 18:03:33 2014
@@ -151,7 +151,7 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
-    <tez.version>0.5.0</tez.version>
+    <tez.version>0.5.1</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java Fri Oct  3 18:03:33 2014
@@ -60,9 +60,6 @@ public class AppMasterEventOperator exte
   protected void initDataBuffer(boolean skipPruning) throws HiveException {
     buffer = new DataOutputBuffer();
     try {
-      // where does this go to?
-      buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
-
       // add any other header info
       getConf().writeEventHeader(buffer);
 

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Fri Oct  3 18:03:33 2014
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
@@ -77,12 +79,13 @@ public class DynamicPartitionPruner {
 
   private final BytesWritable writable = new BytesWritable();
 
-  private final BlockingQueue<InputInitializerEvent> queue =
-      new LinkedBlockingQueue<InputInitializerEvent>();
+  private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+  private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
 
   private int sourceInfoCount = 0;
 
-  private InputInitializerContext context;
+  private final Object endOfEvents = new Object();
 
   public DynamicPartitionPruner() {
   }
@@ -91,8 +94,21 @@ public class DynamicPartitionPruner {
       throws SerDeException, IOException,
       InterruptedException, HiveException {
 
-    this.context = context;
-    this.initialize(work, jobConf);
+    synchronized(sourcesWaitingForEvents) {
+      initialize(work, jobConf);
+
+      if (sourcesWaitingForEvents.isEmpty()) {
+        return;
+      }
+
+      Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+      for (String source : sourcesWaitingForEvents) {
+        // we need to get state transition updates for the vertices that will send
+        // events to us. once we have received all events and a vertex has succeeded,
+        // we can move to do the pruning.
+        context.registerForVertexStateUpdates(source, states);
+      }
+    }
 
     LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
     // synchronous event processing loop. Won't return until all events have
@@ -102,7 +118,7 @@ public class DynamicPartitionPruner {
     LOG.info("Ok to proceed.");
   }
 
-  public BlockingQueue<InputInitializerEvent> getQueue() {
+  public BlockingQueue<Object> getQueue() {
     return queue;
   }
 
@@ -111,11 +127,14 @@ public class DynamicPartitionPruner {
     sourceInfoCount = 0;
   }
 
-  private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+  public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
     this.clear();
     Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+    Set<String> sources = work.getEventSourceTableDescMap().keySet();
+
+    sourcesWaitingForEvents.addAll(sources);
 
-    for (String s : work.getEventSourceTableDescMap().keySet()) {
+    for (String s : sources) {
       List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
       List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
       List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -277,46 +296,30 @@ public class DynamicPartitionPruner {
 
   private void processEvents() throws SerDeException, IOException, InterruptedException {
     int eventCount = 0;
-    int neededEvents = getExpectedNumberOfEvents();
 
-    while (neededEvents > eventCount) {
-      InputInitializerEvent event = queue.take();
+    while (true) {
+      Object element = queue.take();
+
+      if (element == endOfEvents) {
+        // we're done processing events
+        break;
+      }
+
+      InputInitializerEvent event = (InputInitializerEvent) element;
+
       LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
           + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
-      processPayload(event.getUserPayload());
+      processPayload(event.getUserPayload(), event.getSourceVertexName());
       eventCount += 1;
-      neededEvents = getExpectedNumberOfEvents();
-      LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
     }
-  }
-
-  private int getExpectedNumberOfEvents() throws InterruptedException {
-    int neededEvents = 0;
-
-    boolean notInitialized;
-    do {
-      neededEvents = 0;
-      notInitialized = false;
-      for (String s : sourceInfoMap.keySet()) {
-        int multiplier = sourceInfoMap.get(s).size();
-        int taskNum = context.getVertexNumTasks(s);
-        LOG.info("Vertex " + s + " has " + taskNum + " events.");
-        if (taskNum < 0) {
-          notInitialized = true;
-          Thread.sleep(10);
-          continue;
-        }
-        neededEvents += (taskNum * multiplier);
-      }
-    } while (notInitialized);
-
-    return neededEvents;
+    LOG.info("Received events: " + eventCount);
   }
 
   @SuppressWarnings("deprecation")
-  private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
+  private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
+      IOException {
+
     DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
-    String sourceName = in.readUTF();
     String columnName = in.readUTF();
     boolean skip = in.readBoolean();
 
@@ -390,4 +393,26 @@ public class DynamicPartitionPruner {
     }
   }
 
+  public void addEvent(InputInitializerEvent event) {
+    synchronized(sourcesWaitingForEvents) {
+      if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
+          queue.offer(event);
+      }
+    }
+  }
+
+  public void processVertex(String name) {
+    LOG.info("Vertex succeeded: " + name);
+
+    synchronized(sourcesWaitingForEvents) {
+      sourcesWaitingForEvents.remove(name);
+
+      if (sourcesWaitingForEvents.isEmpty()) {
+        // we've got what we need; mark the queue
+        queue.offer(endOfEvents);
+      } else {
+        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
+      }
+    }
+  }
 }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct  3 18:03:33 2014
@@ -38,8 +38,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -243,9 +244,14 @@ public class HiveSplitGenerator extends 
   }
 
   @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    pruner.processVertex(stateUpdate.getVertexName());
+  }
+
+  @Override
   public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
     for (InputInitializerEvent e : events) {
-      pruner.getQueue().put(e);
+      pruner.addEvent(e);
     }
   }
 }