You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2014/08/20 04:16:39 UTC
svn commit: r1619023 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
Author: knoguchi
Date: Wed Aug 20 02:16:38 2014
New Revision: 1619023
URL: http://svn.apache.org/r1619023
Log:
PIG-4132: TEZ-1246 and TEZ-1390 broke a build (knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 20 02:16:38 2014
@@ -64,6 +64,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4132: TEZ-1246 and TEZ-1390 broke a build (knoguchi)
+
PIG-4129: Pig -Dhadoopversion=23 compile fail after TEZ-1426 (daijy)
PIG-4127: Build failure due to TEZ-1132 and TEZ-1416 (lbendig)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Wed Aug 20 02:16:38 2014
@@ -91,7 +91,7 @@ public class PartitionerDefinedVertexMan
new HashMap<String, EdgeManagerPluginDescriptor>();
for(String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
EdgeManagerPluginDescriptor edgeManagerDescriptor =
- new EdgeManagerPluginDescriptor(ScatterGatherEdgeManager.class.getName());
+ EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName());
edgeManagers.put(vertex, edgeManagerDescriptor);
}
getContext().setVertexParallelism(dynamicParallelism, null, edgeManagers, null);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Wed Aug 20 02:16:38 2014
@@ -227,7 +227,7 @@ public class PigProcessor extends Abstra
String sortingVertex = conf.get(SORT_VERTEX);
// Should contain only 1 output for sampleAggregation job
LOG.info("Sending numParallelism " + parallelism + " to " + sortingVertex);
- VertexManagerEvent vmEvent = new VertexManagerEvent(
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(
sortingVertex, Ints.toByteArray(parallelism));
List<Event> events = Lists.newArrayListWithCapacity(1);
events.add(vmEvent);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Aug 20 02:16:38 2014
@@ -209,7 +209,7 @@ public class TezDagBuilder extends TezOp
groupMembers[i] = from;
} else {
EdgeProperty prop = newEdge(pred, tezOp);
- Edge edge = new Edge(from, to, prop);
+ Edge edge = Edge.create(from, to, prop);
dag.addEdge(edge);
}
}
@@ -227,7 +227,7 @@ public class TezDagBuilder extends TezOp
if (store != null) {
vertexGroup.addDataSink(store.getOperatorKey().toString(),
new DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()), dag.getCredentials()));
+ OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
}
}
}
@@ -247,8 +247,8 @@ public class TezDagBuilder extends TezOp
groupInputClass = OrderedGroupedMergedKVInput.class.getName();
}
- return new GroupInputEdge(from, to, edgeProperty,
- new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+ return GroupInputEdge.create(from, to, edgeProperty,
+ InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
}
/**
@@ -264,8 +264,8 @@ public class TezDagBuilder extends TezOp
TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
PhysicalPlan combinePlan = edge.combinePlan;
- InputDescriptor in = new InputDescriptor(edge.inputClassName);
- OutputDescriptor out = new OutputDescriptor(edge.outputClassName);
+ InputDescriptor in = InputDescriptor.create(edge.inputClassName);
+ OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
if (!combinePlan.isEmpty()) {
@@ -344,11 +344,11 @@ public class TezDagBuilder extends TezOp
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
- return new EdgeProperty((EdgeManagerPluginDescriptor)null,
+ return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
edge.dataSourceType, edge.schedulingType, out, in);
}
- return new EdgeProperty(edge.dataMovementType, edge.dataSourceType,
+ return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType,
edge.schedulingType, out, in);
}
@@ -380,7 +380,7 @@ public class TezDagBuilder extends TezOp
private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException,
ClassNotFoundException, InterruptedException {
- ProcessorDescriptor procDesc = new ProcessorDescriptor(
+ ProcessorDescriptor procDesc = ProcessorDescriptor.create(
tezOp.getProcessorName());
// Pass physical plans to vertex as user payload.
@@ -563,7 +563,7 @@ public class TezDagBuilder extends TezOp
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
- Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
+ Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
isMap ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
Map<String, String> taskEnv = new HashMap<String, String>();
@@ -595,13 +595,13 @@ public class TezDagBuilder extends TezOp
// TODO: These should get the globalConf, or a merged version that
// keeps settings like pig.maxCombinedSplitSize
- vertex.setLocationHint(new VertexLocationHint(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
+ vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
- new DataSourceDescriptor(new InputDescriptor(MRInput.class.getName())
- .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
- .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
- .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteArray())),
- new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
+ DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
+ .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+ .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
+ .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+ InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
}
for (POStore store : stores) {
@@ -616,7 +616,7 @@ public class TezDagBuilder extends TezOp
outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(singleStore));
- OutputDescriptor storeOutDescriptor = new OutputDescriptor(
+ OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
.createUserPayloadFromConf(outputPayLoad));
if (tezOp.getVertexGroupStores() != null) {
@@ -629,7 +629,7 @@ public class TezDagBuilder extends TezOp
}
vertex.addDataSink(store.getOperatorKey().toString(),
new DataSinkDescriptor(storeOutDescriptor,
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()),
+ OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
dag.getCredentials()));
}
@@ -647,7 +647,7 @@ public class TezDagBuilder extends TezOp
// Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
// to decrease/increase parallelism of sorting vertex dynamically
// based on the numQuantiles calculated by sample aggregation vertex
- vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+ vertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
PartitionerDefinedVertexManager.class.getName()));
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
} else {
@@ -664,7 +664,7 @@ public class TezDagBuilder extends TezOp
if (containScatterGather && !containCustomPartitioner) {
// Use auto-parallelism feature of ShuffleVertexManager to dynamically
// reduce the parallelism of the vertex
- VertexManagerPluginDescriptor vmPluginDescriptor = new VertexManagerPluginDescriptor(
+ VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(
ShuffleVertexManager.class.getName());
Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Aug 20 02:16:38 2014
@@ -84,7 +84,7 @@ public class TezSessionManager {
throws TezException, IOException, InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
- TezClient tezClient = new TezClient(jobName, amConf, true, requestedAMResources, creds);
+ TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
tezClient.start();
TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {