You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/03 21:36:09 UTC

svn commit: r1629299 - in /hive/trunk: ./ itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/

Author: gunther
Date: Fri Oct  3 19:36:09 2014
New Revision: 1629299

URL: http://svn.apache.org/r1629299
Log:
Revert HIVE-7957 - checkin incorrectly included other files. (Gunther Hagleitner)

Modified:
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
    hive/trunk/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Fri Oct  3 19:36:09 2014
@@ -47,7 +47,6 @@ import org.apache.hadoop.security.SaslRp
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
@@ -130,7 +129,7 @@ public class TestHadoop20SAuthBridge ext
     }
     builder.append("127.0.1.1,");
     builder.append(InetAddress.getLocalHost().getCanonicalHostName());
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+    conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
         builder.toString());
   }
 
@@ -293,7 +292,7 @@ public class TestHadoop20SAuthBridge ext
   private void setGroupsInConf(String[] groupNames, String proxyUserName)
   throws IOException {
    conf.set(
-       DefaultImpersonationProvider.getProxySuperuserGroupConfKey(proxyUserName),
+      ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName),
       StringUtils.join(",", Arrays.asList(groupNames)));
     configureSuperUserIPAddresses(conf, proxyUserName);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);

Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Fri Oct  3 19:36:09 2014
@@ -115,7 +115,7 @@
     <groovy.version>2.1.6</groovy.version>
     <hadoop-20.version>0.20.2</hadoop-20.version>
     <hadoop-20S.version>1.2.1</hadoop-20S.version>
-    <hadoop-23.version>2.5.0</hadoop-23.version>
+    <hadoop-23.version>2.4.0</hadoop-23.version>
     <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
     <hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
     <hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
@@ -151,7 +151,7 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
-    <tez.version>0.5.1</tez.version>
+    <tez.version>0.5.0</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java Fri Oct  3 19:36:09 2014
@@ -60,6 +60,9 @@ public class AppMasterEventOperator exte
   protected void initDataBuffer(boolean skipPruning) throws HiveException {
     buffer = new DataOutputBuffer();
     try {
+      // where does this go to?
+      buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
+
       // add any other header info
       getConf().writeEventHeader(buffer);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Fri Oct  3 19:36:09 2014
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -60,7 +59,6 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
@@ -79,13 +77,12 @@ public class DynamicPartitionPruner {
 
   private final BytesWritable writable = new BytesWritable();
 
-  private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-
-  private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
+  private final BlockingQueue<InputInitializerEvent> queue =
+      new LinkedBlockingQueue<InputInitializerEvent>();
 
   private int sourceInfoCount = 0;
 
-  private final Object endOfEvents = new Object();
+  private InputInitializerContext context;
 
   public DynamicPartitionPruner() {
   }
@@ -94,21 +91,8 @@ public class DynamicPartitionPruner {
       throws SerDeException, IOException,
       InterruptedException, HiveException {
 
-    synchronized(sourcesWaitingForEvents) {
-      initialize(work, jobConf);
-
-      if (sourcesWaitingForEvents.isEmpty()) {
-        return;
-      }
-
-      Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
-      for (String source : sourcesWaitingForEvents) {
-        // we need to get state transition updates for the vertices that will send
-        // events to us. once we have received all events and a vertex has succeeded,
-        // we can move to do the pruning.
-        context.registerForVertexStateUpdates(source, states);
-      }
-    }
+    this.context = context;
+    this.initialize(work, jobConf);
 
     LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
     // synchronous event processing loop. Won't return until all events have
@@ -118,7 +102,7 @@ public class DynamicPartitionPruner {
     LOG.info("Ok to proceed.");
   }
 
-  public BlockingQueue<Object> getQueue() {
+  public BlockingQueue<InputInitializerEvent> getQueue() {
     return queue;
   }
 
@@ -127,14 +111,11 @@ public class DynamicPartitionPruner {
     sourceInfoCount = 0;
   }
 
-  public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+  private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
     this.clear();
     Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
-    Set<String> sources = work.getEventSourceTableDescMap().keySet();
-
-    sourcesWaitingForEvents.addAll(sources);
 
-    for (String s : sources) {
+    for (String s : work.getEventSourceTableDescMap().keySet()) {
       List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
       List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
       List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -296,30 +277,46 @@ public class DynamicPartitionPruner {
 
   private void processEvents() throws SerDeException, IOException, InterruptedException {
     int eventCount = 0;
+    int neededEvents = getExpectedNumberOfEvents();
 
-    while (true) {
-      Object element = queue.take();
-
-      if (element == endOfEvents) {
-        // we're done processing events
-        break;
-      }
-
-      InputInitializerEvent event = (InputInitializerEvent) element;
-
+    while (neededEvents > eventCount) {
+      InputInitializerEvent event = queue.take();
       LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
           + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
-      processPayload(event.getUserPayload(), event.getSourceVertexName());
+      processPayload(event.getUserPayload());
       eventCount += 1;
+      neededEvents = getExpectedNumberOfEvents();
+      LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
     }
-    LOG.info("Received events: " + eventCount);
   }
 
-  @SuppressWarnings("deprecation")
-  private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
-      IOException {
+  private int getExpectedNumberOfEvents() throws InterruptedException {
+    int neededEvents = 0;
+
+    boolean notInitialized;
+    do {
+      neededEvents = 0;
+      notInitialized = false;
+      for (String s : sourceInfoMap.keySet()) {
+        int multiplier = sourceInfoMap.get(s).size();
+        int taskNum = context.getVertexNumTasks(s);
+        LOG.info("Vertex " + s + " has " + taskNum + " events.");
+        if (taskNum < 0) {
+          notInitialized = true;
+          Thread.sleep(10);
+          continue;
+        }
+        neededEvents += (taskNum * multiplier);
+      }
+    } while (notInitialized);
+
+    return neededEvents;
+  }
 
+  @SuppressWarnings("deprecation")
+  private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
     DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
+    String sourceName = in.readUTF();
     String columnName = in.readUTF();
     boolean skip = in.readBoolean();
 
@@ -393,26 +390,4 @@ public class DynamicPartitionPruner {
     }
   }
 
-  public void addEvent(InputInitializerEvent event) {
-    synchronized(sourcesWaitingForEvents) {
-      if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
-          queue.offer(event);
-      }
-    }
-  }
-
-  public void processVertex(String name) {
-    LOG.info("Vertex succeeded: " + name);
-
-    synchronized(sourcesWaitingForEvents) {
-      sourcesWaitingForEvents.remove(name);
-
-      if (sourcesWaitingForEvents.isEmpty()) {
-        // we've got what we need; mark the queue
-        queue.offer(endOfEvents);
-      } else {
-        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
-      }
-    }
-  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct  3 19:36:09 2014
@@ -38,9 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -244,14 +243,9 @@ public class HiveSplitGenerator extends 
   }
 
   @Override
-  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-    pruner.processVertex(stateUpdate.getVertexName());
-  }
-
-  @Override
   public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
     for (InputInitializerEvent e : events) {
-      pruner.addEvent(e);
+      pruner.getQueue().put(e);
     }
   }
 }