You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2014/08/20 04:16:39 UTC

svn commit: r1619023 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/

Author: knoguchi
Date: Wed Aug 20 02:16:38 2014
New Revision: 1619023

URL: http://svn.apache.org/r1619023
Log:
PIG-4132: TEZ-1246 and TEZ-1390 broke a build (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 20 02:16:38 2014
@@ -64,6 +64,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4132: TEZ-1246 and TEZ-1390 broke a build (knoguchi)
+
 PIG-4129: Pig -Dhadoopversion=23 compile fail after TEZ-1426 (daijy)
 
 PIG-4127: Build failure due to TEZ-1132 and TEZ-1416 (lbendig)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Wed Aug 20 02:16:38 2014
@@ -91,7 +91,7 @@ public class PartitionerDefinedVertexMan
                     new HashMap<String, EdgeManagerPluginDescriptor>();
                 for(String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
                     EdgeManagerPluginDescriptor edgeManagerDescriptor =
-                            new EdgeManagerPluginDescriptor(ScatterGatherEdgeManager.class.getName());
+                            EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName());
                     edgeManagers.put(vertex, edgeManagerDescriptor);
                 }
                 getContext().setVertexParallelism(dynamicParallelism, null, edgeManagers, null);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Wed Aug 20 02:16:38 2014
@@ -227,7 +227,7 @@ public class PigProcessor extends Abstra
                 String sortingVertex = conf.get(SORT_VERTEX);
                 // Should contain only 1 output for sampleAggregation job
                 LOG.info("Sending numParallelism " + parallelism + " to " + sortingVertex);
-                VertexManagerEvent vmEvent = new VertexManagerEvent(
+                VertexManagerEvent vmEvent = VertexManagerEvent.create(
                         sortingVertex, Ints.toByteArray(parallelism));
                 List<Event> events = Lists.newArrayListWithCapacity(1);
                 events.add(vmEvent);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Aug 20 02:16:38 2014
@@ -209,7 +209,7 @@ public class TezDagBuilder extends TezOp
                             groupMembers[i] = from;
                         } else {
                             EdgeProperty prop = newEdge(pred, tezOp);
-                            Edge edge = new Edge(from, to, prop);
+                            Edge edge = Edge.create(from, to, prop);
                             dag.addEdge(edge);
                         }
                     }
@@ -227,7 +227,7 @@ public class TezDagBuilder extends TezOp
                 if (store != null) {
                     vertexGroup.addDataSink(store.getOperatorKey().toString(),
                             new DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
-                            new OutputCommitterDescriptor(MROutputCommitter.class.getName()), dag.getCredentials()));
+                            OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
                 }
             }
         }
@@ -247,8 +247,8 @@ public class TezDagBuilder extends TezOp
             groupInputClass = OrderedGroupedMergedKVInput.class.getName();
         }
 
-        return new GroupInputEdge(from, to, edgeProperty,
-                new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+        return GroupInputEdge.create(from, to, edgeProperty,
+                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
 
     /**
@@ -264,8 +264,8 @@ public class TezDagBuilder extends TezOp
         TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
         PhysicalPlan combinePlan = edge.combinePlan;
 
-        InputDescriptor in = new InputDescriptor(edge.inputClassName);
-        OutputDescriptor out = new OutputDescriptor(edge.outputClassName);
+        InputDescriptor in = InputDescriptor.create(edge.inputClassName);
+        OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         if (!combinePlan.isEmpty()) {
@@ -344,11 +344,11 @@ public class TezDagBuilder extends TezOp
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
-            return new EdgeProperty((EdgeManagerPluginDescriptor)null,
+            return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
                     edge.dataSourceType, edge.schedulingType, out, in);
             }
 
-        return new EdgeProperty(edge.dataMovementType, edge.dataSourceType,
+        return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType,
                 edge.schedulingType, out, in);
     }
 
@@ -380,7 +380,7 @@ public class TezDagBuilder extends TezOp
 
     private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException,
             ClassNotFoundException, InterruptedException {
-        ProcessorDescriptor procDesc = new ProcessorDescriptor(
+        ProcessorDescriptor procDesc = ProcessorDescriptor.create(
                 tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
@@ -563,7 +563,7 @@ public class TezDagBuilder extends TezOp
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);
 
-        Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
+        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
                 isMap ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
 
         Map<String, String> taskEnv = new HashMap<String, String>();
@@ -595,13 +595,13 @@ public class TezDagBuilder extends TezOp
 
             // TODO: These should get the globalConf, or a merged version that
             // keeps settings like pig.maxCombinedSplitSize
-            vertex.setLocationHint(new VertexLocationHint(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
+            vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
-                    new DataSourceDescriptor(new InputDescriptor(MRInput.class.getName())
-                            .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
-                            .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-                            .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteArray())),
-                    new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
+                    DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
+                          .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+                          .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
+                          .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+                    InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
         }
 
         for (POStore store : stores) {
@@ -616,7 +616,7 @@ public class TezDagBuilder extends TezOp
             outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
                     ObjectSerializer.serialize(singleStore));
 
-            OutputDescriptor storeOutDescriptor = new OutputDescriptor(
+            OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
                     .createUserPayloadFromConf(outputPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
@@ -629,7 +629,7 @@ public class TezDagBuilder extends TezOp
             }
             vertex.addDataSink(store.getOperatorKey().toString(),
                     new DataSinkDescriptor(storeOutDescriptor,
-                    new OutputCommitterDescriptor(MROutputCommitter.class.getName()),
+                    OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
                     dag.getCredentials()));
         }
 
@@ -647,7 +647,7 @@ public class TezDagBuilder extends TezOp
                 // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
                 // to decrease/increase parallelism of sorting vertex dynamically
                 // based on the numQuantiles calculated by sample aggregation vertex
-                vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+                vertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
                         PartitionerDefinedVertexManager.class.getName()));
                 log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
             } else {
@@ -664,7 +664,7 @@ public class TezDagBuilder extends TezOp
                 if (containScatterGather && !containCustomPartitioner) {
                     // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                     // reduce the parallelism of the vertex
-                    VertexManagerPluginDescriptor vmPluginDescriptor = new VertexManagerPluginDescriptor(
+                    VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(
                             ShuffleVertexManager.class.getName());
                     Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1619023&r1=1619022&r2=1619023&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Aug 20 02:16:38 2014
@@ -84,7 +84,7 @@ public class TezSessionManager {
             throws TezException, IOException, InterruptedException {
         TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
         String jobName = conf.get(PigContext.JOB_NAME, "pig");
-        TezClient tezClient = new TezClient(jobName, amConf, true, requestedAMResources, creds);
+        TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         tezClient.start();
         TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
         if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {