You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/03 21:36:09 UTC
svn commit: r1629299 - in /hive/trunk: ./
itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
Author: gunther
Date: Fri Oct 3 19:36:09 2014
New Revision: 1629299
URL: http://svn.apache.org/r1629299
Log:
Revert HIVE-7957 - checkin incorrectly included other files. (Gunther Hagleitner)
Modified:
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
hive/trunk/pom.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Fri Oct 3 19:36:09 2014
@@ -47,7 +47,6 @@ import org.apache.hadoop.security.SaslRp
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@@ -130,7 +129,7 @@ public class TestHadoop20SAuthBridge ext
}
builder.append("127.0.1.1,");
builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+ conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
@@ -293,7 +292,7 @@ public class TestHadoop20SAuthBridge ext
private void setGroupsInConf(String[] groupNames, String proxyUserName)
throws IOException {
conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(proxyUserName),
+ ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName),
StringUtils.join(",", Arrays.asList(groupNames)));
configureSuperUserIPAddresses(conf, proxyUserName);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Fri Oct 3 19:36:09 2014
@@ -115,7 +115,7 @@
<groovy.version>2.1.6</groovy.version>
<hadoop-20.version>0.20.2</hadoop-20.version>
<hadoop-20S.version>1.2.1</hadoop-20S.version>
- <hadoop-23.version>2.5.0</hadoop-23.version>
+ <hadoop-23.version>2.4.0</hadoop-23.version>
<hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
<hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
<hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
@@ -151,7 +151,7 @@
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
- <tez.version>0.5.1</tez.version>
+ <tez.version>0.5.0</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java Fri Oct 3 19:36:09 2014
@@ -60,6 +60,9 @@ public class AppMasterEventOperator exte
protected void initDataBuffer(boolean skipPruning) throws HiveException {
buffer = new DataOutputBuffer();
try {
+ // where does this go to?
+ buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
+
// add any other header info
getConf().writeEventHeader(buffer);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Fri Oct 3 19:36:09 2014
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,7 +59,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -79,13 +77,12 @@ public class DynamicPartitionPruner {
private final BytesWritable writable = new BytesWritable();
- private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-
- private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
+ private final BlockingQueue<InputInitializerEvent> queue =
+ new LinkedBlockingQueue<InputInitializerEvent>();
private int sourceInfoCount = 0;
- private final Object endOfEvents = new Object();
+ private InputInitializerContext context;
public DynamicPartitionPruner() {
}
@@ -94,21 +91,8 @@ public class DynamicPartitionPruner {
throws SerDeException, IOException,
InterruptedException, HiveException {
- synchronized(sourcesWaitingForEvents) {
- initialize(work, jobConf);
-
- if (sourcesWaitingForEvents.isEmpty()) {
- return;
- }
-
- Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
- for (String source : sourcesWaitingForEvents) {
- // we need to get state transition updates for the vertices that will send
- // events to us. once we have received all events and a vertex has succeeded,
- // we can move to do the pruning.
- context.registerForVertexStateUpdates(source, states);
- }
- }
+ this.context = context;
+ this.initialize(work, jobConf);
LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
// synchronous event processing loop. Won't return until all events have
@@ -118,7 +102,7 @@ public class DynamicPartitionPruner {
LOG.info("Ok to proceed.");
}
- public BlockingQueue<Object> getQueue() {
+ public BlockingQueue<InputInitializerEvent> getQueue() {
return queue;
}
@@ -127,14 +111,11 @@ public class DynamicPartitionPruner {
sourceInfoCount = 0;
}
- public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+ private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
this.clear();
Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
- Set<String> sources = work.getEventSourceTableDescMap().keySet();
-
- sourcesWaitingForEvents.addAll(sources);
- for (String s : sources) {
+ for (String s : work.getEventSourceTableDescMap().keySet()) {
List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -296,30 +277,46 @@ public class DynamicPartitionPruner {
private void processEvents() throws SerDeException, IOException, InterruptedException {
int eventCount = 0;
+ int neededEvents = getExpectedNumberOfEvents();
- while (true) {
- Object element = queue.take();
-
- if (element == endOfEvents) {
- // we're done processing events
- break;
- }
-
- InputInitializerEvent event = (InputInitializerEvent) element;
-
+ while (neededEvents > eventCount) {
+ InputInitializerEvent event = queue.take();
LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
+ ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
- processPayload(event.getUserPayload(), event.getSourceVertexName());
+ processPayload(event.getUserPayload());
eventCount += 1;
+ neededEvents = getExpectedNumberOfEvents();
+ LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
}
- LOG.info("Received events: " + eventCount);
}
- @SuppressWarnings("deprecation")
- private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
- IOException {
+ private int getExpectedNumberOfEvents() throws InterruptedException {
+ int neededEvents = 0;
+
+ boolean notInitialized;
+ do {
+ neededEvents = 0;
+ notInitialized = false;
+ for (String s : sourceInfoMap.keySet()) {
+ int multiplier = sourceInfoMap.get(s).size();
+ int taskNum = context.getVertexNumTasks(s);
+ LOG.info("Vertex " + s + " has " + taskNum + " events.");
+ if (taskNum < 0) {
+ notInitialized = true;
+ Thread.sleep(10);
+ continue;
+ }
+ neededEvents += (taskNum * multiplier);
+ }
+ } while (notInitialized);
+
+ return neededEvents;
+ }
+ @SuppressWarnings("deprecation")
+ private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
+ String sourceName = in.readUTF();
String columnName = in.readUTF();
boolean skip = in.readBoolean();
@@ -393,26 +390,4 @@ public class DynamicPartitionPruner {
}
}
- public void addEvent(InputInitializerEvent event) {
- synchronized(sourcesWaitingForEvents) {
- if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
- queue.offer(event);
- }
- }
- }
-
- public void processVertex(String name) {
- LOG.info("Vertex succeeded: " + name);
-
- synchronized(sourcesWaitingForEvents) {
- sourcesWaitingForEvents.remove(name);
-
- if (sourcesWaitingForEvents.isEmpty()) {
- // we've got what we need; mark the queue
- queue.offer(endOfEvents);
- } else {
- LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
- }
- }
- }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct 3 19:36:09 2014
@@ -38,9 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -244,14 +243,9 @@ public class HiveSplitGenerator extends
}
@Override
- public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
- pruner.processVertex(stateUpdate.getVertexName());
- }
-
- @Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
for (InputInitializerEvent e : events) {
- pruner.addEvent(e);
+ pruner.getQueue().put(e);
}
}
}