You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/03 20:03:34 UTC
svn commit: r1629275 - in /hive/branches/branch-0.14: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
Author: gunther
Date: Fri Oct 3 18:03:33 2014
New Revision: 1629275
URL: http://svn.apache.org/r1629275
Log:
HIVE-7957: Revisit event version handling in dynamic partition pruning on Tez (Gunther Hagleitner, reviewed by Vikram Dixit K)
Modified:
hive/branches/branch-0.14/pom.xml
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
Modified: hive/branches/branch-0.14/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/pom.xml?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/pom.xml (original)
+++ hive/branches/branch-0.14/pom.xml Fri Oct 3 18:03:33 2014
@@ -151,7 +151,7 @@
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
- <tez.version>0.5.0</tez.version>
+ <tez.version>0.5.1</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java Fri Oct 3 18:03:33 2014
@@ -60,9 +60,6 @@ public class AppMasterEventOperator exte
protected void initDataBuffer(boolean skipPruning) throws HiveException {
buffer = new DataOutputBuffer();
try {
- // where does this go to?
- buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
-
// add any other header info
getConf().writeEventHeader(buffer);
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Fri Oct 3 18:03:33 2014
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -77,12 +79,13 @@ public class DynamicPartitionPruner {
private final BytesWritable writable = new BytesWritable();
- private final BlockingQueue<InputInitializerEvent> queue =
- new LinkedBlockingQueue<InputInitializerEvent>();
+ private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+ private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
private int sourceInfoCount = 0;
- private InputInitializerContext context;
+ private final Object endOfEvents = new Object();
public DynamicPartitionPruner() {
}
@@ -91,8 +94,21 @@ public class DynamicPartitionPruner {
throws SerDeException, IOException,
InterruptedException, HiveException {
- this.context = context;
- this.initialize(work, jobConf);
+ synchronized(sourcesWaitingForEvents) {
+ initialize(work, jobConf);
+
+ if (sourcesWaitingForEvents.isEmpty()) {
+ return;
+ }
+
+ Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+ for (String source : sourcesWaitingForEvents) {
+ // we need to get state transition updates for the vertices that will send
+ // events to us. once we have received all events and a vertex has succeeded,
+ // we can move to do the pruning.
+ context.registerForVertexStateUpdates(source, states);
+ }
+ }
LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
// synchronous event processing loop. Won't return until all events have
@@ -102,7 +118,7 @@ public class DynamicPartitionPruner {
LOG.info("Ok to proceed.");
}
- public BlockingQueue<InputInitializerEvent> getQueue() {
+ public BlockingQueue<Object> getQueue() {
return queue;
}
@@ -111,11 +127,14 @@ public class DynamicPartitionPruner {
sourceInfoCount = 0;
}
- private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+ public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
this.clear();
Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+ Set<String> sources = work.getEventSourceTableDescMap().keySet();
+
+ sourcesWaitingForEvents.addAll(sources);
- for (String s : work.getEventSourceTableDescMap().keySet()) {
+ for (String s : sources) {
List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -277,46 +296,30 @@ public class DynamicPartitionPruner {
private void processEvents() throws SerDeException, IOException, InterruptedException {
int eventCount = 0;
- int neededEvents = getExpectedNumberOfEvents();
- while (neededEvents > eventCount) {
- InputInitializerEvent event = queue.take();
+ while (true) {
+ Object element = queue.take();
+
+ if (element == endOfEvents) {
+ // we're done processing events
+ break;
+ }
+
+ InputInitializerEvent event = (InputInitializerEvent) element;
+
LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
+ ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
- processPayload(event.getUserPayload());
+ processPayload(event.getUserPayload(), event.getSourceVertexName());
eventCount += 1;
- neededEvents = getExpectedNumberOfEvents();
- LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
}
- }
-
- private int getExpectedNumberOfEvents() throws InterruptedException {
- int neededEvents = 0;
-
- boolean notInitialized;
- do {
- neededEvents = 0;
- notInitialized = false;
- for (String s : sourceInfoMap.keySet()) {
- int multiplier = sourceInfoMap.get(s).size();
- int taskNum = context.getVertexNumTasks(s);
- LOG.info("Vertex " + s + " has " + taskNum + " events.");
- if (taskNum < 0) {
- notInitialized = true;
- Thread.sleep(10);
- continue;
- }
- neededEvents += (taskNum * multiplier);
- }
- } while (notInitialized);
-
- return neededEvents;
+ LOG.info("Received events: " + eventCount);
}
@SuppressWarnings("deprecation")
- private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
+ private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
+ IOException {
+
DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
- String sourceName = in.readUTF();
String columnName = in.readUTF();
boolean skip = in.readBoolean();
@@ -390,4 +393,26 @@ public class DynamicPartitionPruner {
}
}
+ public void addEvent(InputInitializerEvent event) {
+ synchronized(sourcesWaitingForEvents) {
+ if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
+ queue.offer(event);
+ }
+ }
+ }
+
+ public void processVertex(String name) {
+ LOG.info("Vertex succeeded: " + name);
+
+ synchronized(sourcesWaitingForEvents) {
+ sourcesWaitingForEvents.remove(name);
+
+ if (sourcesWaitingForEvents.isEmpty()) {
+ // we've got what we need; mark the queue
+ queue.offer(endOfEvents);
+ } else {
+ LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
+ }
+ }
+ }
}
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629275&r1=1629274&r2=1629275&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct 3 18:03:33 2014
@@ -38,8 +38,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -243,9 +244,14 @@ public class HiveSplitGenerator extends
}
@Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ pruner.processVertex(stateUpdate.getVertexName());
+ }
+
+ @Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
for (InputInitializerEvent e : events) {
- pruner.getQueue().put(e);
+ pruner.addEvent(e);
}
}
}