You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/23 02:06:22 UTC
svn commit: r1560565 [1/3] - in /pig/branches/tez:
shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/optimizer/ src/org/apach...
Author: rohini
Date: Thu Jan 23 01:06:21 2014
New Revision: 1560565
URL: http://svn.apache.org/r1560565
Log:
PIG-3626: Make combiners, custom partitioners and secondary key sort work for multiple outputs (rohini)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC14.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC15.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
Modified:
pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/branches/tez/src/org/apache/pig/PigConfiguration.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java
pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java
pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java
pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
pig/branches/tez/test/org/apache/pig/test/Util.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC8.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
pig/branches/tez/test/tez-tests
Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Thu Jan 23 01:06:21 2014
@@ -22,10 +22,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pig.ExecType;
/**
* This class builds a single instance of itself with the Singleton
@@ -48,7 +49,12 @@ public class MiniCluster extends MiniGen
@Deprecated
public static MiniCluster buildCluster() {
System.setProperty("test.exec.type", "mr");
- return (MiniCluster)MiniGenericCluster.buildCluster();
+ return (MiniCluster)MiniGenericCluster.buildCluster("mr");
+ }
+
+ @Override
+ protected ExecType getExecType() {
+ return ExecType.MAPREDUCE;
}
@Override
Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Jan 23 01:06:21 2014
@@ -27,9 +27,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
+import org.apache.tez.common.TezJobConfig;
public class TezMiniCluster extends MiniGenericCluster {
private static final File CONF_DIR = new File("build/classes");
@@ -38,26 +42,21 @@ public class TezMiniCluster extends Mini
private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+ private static final ExecType TEZ = new TezExecType();
protected MiniMRYarnCluster m_mr = null;
private Configuration m_dfs_conf = null;
private Configuration m_mr_conf = null;
@Override
+ protected ExecType getExecType() {
+ return TEZ;
+ }
+
+ @Override
public void setupMiniDfsAndMrClusters() {
try {
- if (TEZ_CONF_FILE.exists()) {
- TEZ_CONF_FILE.delete();
- }
- if (CORE_CONF_FILE.exists()) {
- CORE_CONF_FILE.delete();
- }
- if (HDFS_CONF_FILE.exists()) {
- HDFS_CONF_FILE.delete();
- }
- if (MAPRED_CONF_FILE.exists()) {
- MAPRED_CONF_FILE.delete();
- }
+ deleteConfFiles();
CONF_DIR.mkdirs();
// Build mini DFS cluster
@@ -86,7 +85,8 @@ public class TezMiniCluster extends Mini
m_mr_conf.set("mapreduce.framework.name", "yarn-tez");
m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
System.getProperty("java.class.path"));
-
+ // TODO PIG-3659 - Remove this once memory management is fixed
+ m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx384M");// -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8005 -Xnoagent -Djava.compiler=NONE");
m_mr_conf.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
m_fileSys.copyFromLocalFile(
new Path(MAPRED_CONF_FILE.getAbsoluteFile().toString()),
@@ -103,6 +103,8 @@ public class TezMiniCluster extends Mini
// Write tez-site.xml
Configuration tez_conf = new Configuration(false);
+ // TODO PIG-3659 - Remove this once memory management is fixed
+ tez_conf.set(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, "20");
tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");
tez_conf.writeXml(new FileOutputStream(TEZ_CONF_FILE));
m_fileSys.copyFromLocalFile(
@@ -141,6 +143,14 @@ public class TezMiniCluster extends Mini
@Override
protected void shutdownMiniMrClusters() {
+ deleteConfFiles();
+ if (m_mr != null) {
+ m_mr.stop();
+ m_mr = null;
+ }
+ }
+
+ private void deleteConfFiles() {
if(TEZ_CONF_FILE.exists()) {
TEZ_CONF_FILE.delete();
}
@@ -153,9 +163,5 @@ public class TezMiniCluster extends Mini
if(MAPRED_CONF_FILE.exists()) {
MAPRED_CONF_FILE.delete();
}
- if (m_mr != null) {
- m_mr.stop();
- m_mr = null;
- }
}
}
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan 23 01:06:21 2014
@@ -96,13 +96,13 @@ public class PigConfiguration {
* will be set in the environment.
*/
public static final String PIG_STREAMING_ENVIRONMENT = "pig.streaming.environment";
-
+
/**
* This key is used to define the default load func. Pig will fallback on PigStorage
* as default in case this is undefined.
*/
public static final String PIG_DEFAULT_LOAD_FUNC = "pig.default.load.func";
-
+
/**
* This key is used to define the default store func. Pig will fallback on PigStorage
* as default in case this is undefined.
@@ -180,5 +180,11 @@ public class PigConfiguration {
* Smaller files are combined untill this size is reached.
*/
public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
+
+ /**
+ * This key controls whether secondary sort key is used for optimization in case
+ * of nested distinct or sort
+ */
+ public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java Thu Jan 23 01:06:21 2014
@@ -28,7 +28,7 @@ public class ColumnInfo implements Clone
byte resultType;
int startCol = -1;
boolean isRangeProject = false;
-
+
public ColumnInfo(List<Integer> columns, byte type) {
this.columns = columns;
this.resultType = type;
@@ -44,7 +44,24 @@ public class ColumnInfo implements Clone
this.resultType = type;
this.isRangeProject = true;
}
-
+
+ public byte getResultType() {
+ return resultType;
+ }
+
+ public int getStartCol() {
+ return startCol;
+ }
+
+ public boolean isRangeProject() {
+ return isRangeProject;
+ }
+
+ public List<Integer> getColumns() {
+ return columns;
+ }
+
+ @Override
public String toString() {
String result;
if (isStar())
@@ -53,32 +70,33 @@ public class ColumnInfo implements Clone
result+=DataType.findTypeName(resultType);
return result;
}
-
+
private boolean isStar() {
return isRangeProject && startCol == 0;
}
+ @Override
public boolean equals(Object o2)
{
-
+
if (o2 == null || !(o2 instanceof ColumnInfo))
return false;
ColumnInfo c2 = (ColumnInfo)o2;
if (
isRangeProject == c2.isRangeProject &&
startCol == c2.startCol &&
- ((columns == null && c2.columns == null) ||
+ ((columns == null && c2.columns == null) ||
(columns != null && columns.equals(c2.columns)))
)
return true;
-
+
return false;
}
@Override
public int hashCode() {
return toString().hashCode();
}
-
+
@Override
public Object clone() throws CloneNotSupportedException{
ColumnInfo newColInfo = (ColumnInfo)super.clone();
@@ -87,6 +105,6 @@ public class ColumnInfo implements Clone
cols.addAll(this.columns);
newColInfo.columns = cols;
return newColInfo;
-
+
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Jan 23 01:06:21 2014
@@ -95,6 +95,7 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
+ @Override
public void kill() {
try {
log.debug("Receive kill signal");
@@ -111,6 +112,7 @@ public class MapReduceLauncher extends L
}
}
+ @Override
public void killJob(String jobID, Configuration conf) throws BackendException {
try {
if (conf != null) {
@@ -639,9 +641,9 @@ public class MapReduceLauncher extends L
la.adjust();
}
// Optimize to use secondary sort key if possible
- prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
+ prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
if (!pc.inIllustrator && !("true".equals(prop))) {
- SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan);
+ SecondaryKeyOptimizerMR skOptimizer = new SecondaryKeyOptimizerMR(plan);
skOptimizer.visit();
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerMR extends MROpPlanVisitor implements SecondaryKeyOptimizer {
+ private static Log log = LogFactory.getLog(SecondaryKeyOptimizerMR.class);
+ private SecondaryKeyOptimizerInfo info;
+
+ /**
+ * @param plan
+ * The MROperPlan to visit to discover keyType
+ */
+ public SecondaryKeyOptimizerMR(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ // Only optimize for Cogroup case
+ if (mr.isGlobalSort())
+ return;
+
+ info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan);
+ if (info != null && info.isUseSecondaryKey()) {
+ mr.setUseSecondaryKey(true);
+ mr.setSecondarySortOrder(info.getSecondarySortOrder());
+ log.info("Using Secondary Key Optimization for MapReduce node " + mr.getOperatorKey());
+ }
+ }
+
+
+ @Override
+ public int getNumSortRemoved() {
+ return (info == null) ? 0 : info.getNumSortRemoved();
+ }
+
+ @Override
+ public int getNumDistinctChanged() {
+ return (info == null) ? 0 : info.getNumDistinctChanged();
+ }
+
+ @Override
+ public int getNumUseSecondaryKey() {
+ return (info == null) ? 0 : info.getNumUseSecondaryKey();
+ }
+
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.optimizer;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Remove POSort and change PODistinct to use POSortedDistinct in nested foreach plan by sorting on a secondary key
+ */
+public interface SecondaryKeyOptimizer {
+
+ int getNumSortRemoved();
+
+ int getNumDistinctChanged();
+
+ int getNumUseSecondaryKey();
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Jan 23 01:06:21 2014
@@ -181,6 +181,10 @@ public abstract class PhysicalOperator e
return (alias == null) ? "" : (alias + ": ");
}
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
this.alias = alias;
this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
@@ -282,9 +286,7 @@ public abstract class PhysicalOperator e
try {
if (input == null && (inputs == null || inputs.size() == 0)) {
// log.warn("No inputs found. Signaling End of Processing.");
- Result res = new Result();
- res.returnStatus = POStatus.STATUS_EOP;
- return res;
+ return new Result(POStatus.STATUS_EOP, null);
}
// Should be removed once the model is clear
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jan 23 01:06:21 2014
@@ -81,7 +81,7 @@ public class POLocalRearrange extends Ph
protected boolean mIsDistinct = false;
protected boolean isCross = false;
-
+
protected Result inp;
// map to store mapping of projected columns to
@@ -529,11 +529,16 @@ public class POLocalRearrange extends Ph
return keyType;
}
+ public byte getMainKeyType() {
+ return mainKeyType;
+ }
+
public void setKeyType(byte keyType) {
if (useSecondaryKey) {
this.mainKeyType = keyType;
} else {
this.keyType = keyType;
+ this.mainKeyType = keyType;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Jan 23 01:06:21 2014
@@ -83,9 +83,9 @@ public class POReservoirSample extends P
// populate the samples array with first numSamples tuples
Result res = null;
int rowProcessed = 0;
- while (rowProcessed<numSamples) {
+ while (rowProcessed < numSamples) {
res = processInput();
- if(res.returnStatus == POStatus.STATUS_OK) {
+ if (res.returnStatus == POStatus.STATUS_OK) {
samples[rowProcessed] = res;
rowProcessed++;
} else if (res.returnStatus == POStatus.STATUS_NULL) {
@@ -95,14 +95,14 @@ public class POReservoirSample extends P
}
}
- int rowNum = numSamples+1;
+ int rowNum = numSamples + 1;
Random randGen = new Random();
- if(res.returnStatus == POStatus.STATUS_OK){ // did not exhaust all tuples
- while(true){
+ if (res.returnStatus == POStatus.STATUS_OK) { // did not exhaust all tuples
+ while (true) {
// pick this as sample
Result sampleResult = processInput();
- if(sampleResult.returnStatus == POStatus.STATUS_NULL) {
+ if (sampleResult.returnStatus == POStatus.STATUS_NULL) {
continue;
} else if (sampleResult.returnStatus != POStatus.STATUS_OK) {
break;
@@ -110,7 +110,7 @@ public class POReservoirSample extends P
// collect samples until input is exhausted
int rand = randGen.nextInt(rowNum);
- if(rand < numSamples){
+ if (rand < numSamples) {
samples[rand] = sampleResult;
}
rowNum++;
@@ -125,11 +125,14 @@ public class POReservoirSample extends P
if (illustrator != null) {
illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
}
- return samples[nextSampleIdx++];
+ Result res = samples[nextSampleIdx++];
+ if (res == null) { // Input data has lesser rows than numSamples
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ return res;
}
else{
- Result res = new Result();
- res.returnStatus = POStatus.STATUS_EOP;
+ Result res = new Result(POStatus.STATUS_EOP, null);
return res;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Jan 23 01:06:21 2014
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.net.URI;
import java.util.LinkedList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -76,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import com.google.common.collect.Lists;
@@ -128,6 +130,25 @@ public class PlanHelper {
return finder.getFoundOps();
}
+ /**
+ * Finds POLocalRearrange from POSplit sub-plan
+ * @param plan physical plan
+ * @param rearrangeKey operator key of the POLocalRearrange
+ * @return POLocalRearrange with the specified operator key which is in a sub-plan of POSplit
+ * @throws VisitorException
+ */
+ public static PhysicalPlan getLocalRearrangePlanFromSplit(PhysicalPlan plan, OperatorKey rearrangeKey) throws VisitorException {
+ List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+ for (POSplit split : splits) {
+ for (PhysicalPlan subPlan : split.getPlans()) {
+ if (subPlan.getOperator(rearrangeKey) != null) {
+ return subPlan;
+ }
+ }
+ }
+ return plan;
+ }
+
private static class OpFinder<C extends PhysicalOperator> extends PhyPlanVisitor {
final Class<C> opClass;
@@ -425,6 +446,7 @@ public class PlanHelper {
visit(stream);
}
+ @Override
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
super.visitSkewedJoin(sk);
visit(sk);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java Thu Jan 23 01:06:21 2014
@@ -20,13 +20,11 @@ package org.apache.pig.backend.hadoop.ex
import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -34,7 +32,6 @@ import org.apache.pig.impl.plan.VisitorE
*/
public class CombinerOptimizer extends TezOpPlanVisitor {
private CompilationMessageCollector messageCollector = null;
- private TezOperPlan parentPlan;
private boolean doMapAgg;
public CombinerOptimizer(TezOperPlan plan, boolean doMapAgg) {
@@ -46,7 +43,6 @@ public class CombinerOptimizer extends T
super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
this.messageCollector = messageCollector;
this.doMapAgg = doMapAgg;
- this.parentPlan = plan;
}
public CompilationMessageCollector getMessageCollector() {
@@ -60,37 +56,38 @@ public class CombinerOptimizer extends T
return;
}
- List<TezOperator> predecessors = parentPlan.getPredecessors(to);
+ List<TezOperator> predecessors = mPlan.getPredecessors(to);
if (predecessors == null) {
return;
}
for (TezOperator from : predecessors) {
- List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrange.class);
+ List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
if (rearranges.isEmpty()) {
continue;
}
+ POLocalRearrangeTez connectingLR = null;
+ PhysicalPlan rearrangePlan = from.plan;
+ for (POLocalRearrangeTez lr : rearranges) {
+ if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ connectingLR = lr;
+ break;
+ }
+ }
+
+ if (from.plan.getOperator(connectingLR.getOperatorKey()) == null) {
+ // The POLocalRearrange is sub-plan of a POSplit
+ rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
+ }
+
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
- // TODO: Right now, CombinerOptimzerUtil doesn't handle a single map
- // plan with multiple POLocalRearrange leaves. i.e. SPLIT + multiple
- // GROUP BY with different keys.
- CombinerOptimizerUtil.addCombiner(from.plan, to.plan, combinePlan, messageCollector, doMapAgg);
-
- //Replace POLocalRearrange with POLocalRearrangeTez
- if (!combinePlan.isEmpty()) {
- POLocalRearrange lr = (POLocalRearrange) combinePlan.getLeaves().get(0);
- POLocalRearrangeTez lrt = new POLocalRearrangeTez(lr);
- lrt.setOutputKey(to.getOperatorKey().toString());
- try {
- combinePlan.replace(lr, lrt);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
+ CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
+
}
}
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Jan 23 01:06:21 2014
@@ -25,9 +25,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -39,6 +37,7 @@ import org.apache.pig.impl.io.NullableTu
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
public class POShuffleTezLoad extends POPackage implements TezLoad {
@@ -62,12 +61,8 @@ public class POShuffleTezLoad extends PO
@Override
public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
throws ExecException {
- try {
- comparator = ReflectionUtils.newInstance(
- TezDagBuilder.comparatorForKeyType(pkgr.getKeyType(), isSkewedJoin), conf);
- } catch (JobCreationException e) {
- throw new ExecException(e);
- }
+
+ comparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
try {
for (String key : inputKeys) {
LogicalInput input = inputs.get(key);
@@ -105,7 +100,7 @@ public class POShuffleTezLoad extends PO
while (res.returnStatus == POStatus.STATUS_EOP) {
boolean hasData = false;
Object cur = null;
- Object min = null;
+ PigNullableWritable min = null;
try {
for (int i = 0; i < numInputs; i++) {
@@ -113,11 +108,12 @@ public class POShuffleTezLoad extends PO
hasData = true;
cur = readers.get(i).getCurrentKey();
if (min == null || comparator.compare(min, cur) > 0) {
- min = cur;
+ min = PigNullableWritable.newInstance((PigNullableWritable)cur);
+ cur = min;
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new ExecException(e);
}
@@ -125,7 +121,7 @@ public class POShuffleTezLoad extends PO
return new Result(POStatus.STATUS_EOP, null);
}
- key = pkgr.getKey((PigNullableWritable) min);
+ key = pkgr.getKey(min);
DataBag[] bags = new DataBag[numInputs];
POPackageTupleBuffer buffer = new POPackageTupleBuffer();
@@ -133,11 +129,13 @@ public class POShuffleTezLoad extends PO
try {
for (int i = 0; i < numInputs; i++) {
+
DataBag bag = null;
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
- if (comparator.compare(min, cur) == 0) {
+ // We need to loop in case of Grouping Comparators
+ while (comparator.compare(min, cur) == 0) {
Iterable<Object> vals = readers.get(i).getCurrentValues();
if (isAccumulative()) {
// TODO: POPackageTupleBuffer expects the
@@ -150,14 +148,21 @@ public class POShuffleTezLoad extends PO
// into a new list and pass the iterator of this
// new list.
for (Object val : vals) {
- nTups.add((NullableTuple) val);
+ // Make a copy of key and val and avoid reference.
+ // getCurrentKey() or value iterator resets value
+ // on the same object by calling readFields() again.
+ nTups.add(new NullableTuple((NullableTuple) val));
}
// Attach input to POPackageTupleBuffer
- buffer.setKey(cur);
buffer.setIterator(nTups.iterator());
- bag = new AccumulativeBag(buffer, i);
+ if(bags[i] == null) {
+ buffer.setKey(cur);
+ bag = new AccumulativeBag(buffer, i);
+ } else {
+ bag = bags[i];
+ }
} else {
- bag = new InternalCachedBag(numInputs);
+ bag = bags[i] == null? new InternalCachedBag(numInputs) : bags[i];
for (Object val : vals) {
NullableTuple nTup = (NullableTuple) val;
int index = nTup.getIndex();
@@ -165,8 +170,12 @@ public class POShuffleTezLoad extends PO
bag.add(tup);
}
}
- finished[i] = !readers.get(i).next();
bags[i] = bag;
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
}
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerTez extends TezOpPlanVisitor implements SecondaryKeyOptimizer {
+
+ private static Log log = LogFactory.getLog(SecondaryKeyOptimizerTez.class);
+
+ private int numSortRemoved = 0;
+ private int numDistinctChanged = 0;
+ private int numUseSecondaryKey = 0;
+
+ public SecondaryKeyOptimizerTez(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator to) throws VisitorException {
+
+ List<TezOperator> predecessors = mPlan.getPredecessors(to);
+ if (predecessors == null) {
+ return;
+ }
+
+ for (TezOperator from : predecessors) {
+ List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
+ if (rearranges.isEmpty()) {
+ continue;
+ }
+
+ POLocalRearrangeTez connectingLR = null;
+ PhysicalPlan rearrangePlan = from.plan;
+ for (POLocalRearrangeTez lr : rearranges) {
+ if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ connectingLR = lr;
+ break;
+ }
+ }
+
+ // Detected the POLocalRearrange -> POPackage pattern. Let's add
+ // combiner if possible.
+ TezEdgeDescriptor inEdge = to.inEdges.get(from.getOperatorKey());
+ // Only optimize for Cogroup case
+ if (from.isGlobalSort()) {
+ return;
+ }
+
+ // If there is a custom partitioner do not do secondary key optimization.
+ // MR SecondaryKeyOptimizer currently does not check for this case.
+ if (inEdge.partitionerClass != null) {
+ return;
+ }
+
+ if (from.plan.getOperator(connectingLR.getOperatorKey()) == null) {
+ // The POLocalRearrange is sub-plan of a POSplit
+ rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
+ }
+
+ //TODO: Case of from plan leaf being POUnion.
+ SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan);
+ if (info != null) {
+ numSortRemoved += info.getNumSortRemoved();
+ numDistinctChanged += info.getNumDistinctChanged();
+ numUseSecondaryKey += info.getNumUseSecondaryKey();
+ if (info.isUseSecondaryKey()) {
+ // Set it on the receiving vertex and the connecting edge.
+ to.setUseSecondaryKey(true);
+ inEdge.setUseSecondaryKey(true);
+ inEdge.setSecondarySortOrder(info.getSecondarySortOrder());
+ log.info("Using Secondary Key Optimization in the edge between vertex - "
+ + to.getOperatorKey()
+ + " and vertex - "
+ + from.getOperatorKey());
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getNumSortRemoved() {
+ return numSortRemoved;
+ }
+
+ @Override
+ public int getNumDistinctChanged() {
+ return numDistinctChanged;
+ }
+
+ @Override
+ public int getNumUseSecondaryKey() {
+ return numUseSecondaryKey;
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Thu Jan 23 01:06:21 2014
@@ -43,12 +43,11 @@ public class SkewedPartitionerTez extend
try {
if (distMap != null) {
- Integer[] totalReducers = new Integer[1];
// The distMap is structured as (key, min, max) where min, max
// being the index of the reducers
DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
- totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+ totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
Iterator<Tuple> it = partitionList.iterator();
while (it.hasNext()) {
Tuple idxTuple = it.next();
@@ -56,7 +55,7 @@ public class SkewedPartitionerTez extend
Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
// Used to replace the maxIndex with the number of reducers
if (maxIndex < minIndex) {
- maxIndex = totalReducers[0] + maxIndex;
+ maxIndex = totalReducers + maxIndex;
}
Tuple keyT;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan 23 01:06:21 2014
@@ -92,9 +92,9 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.CompilerUtils;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -237,12 +237,6 @@ public class TezCompiler extends PhyPlan
for (TezOperator tezOper : splitsSeen.values()) {
int idx = 0;
- List<POLocalRearrange> rearranges = PlanHelper
- .getPhysicalOperators(tezOper.plan, POLocalRearrange.class);
- for (POLocalRearrange op : rearranges) {
- op.setIndex(idx++);
- }
- idx = 0;
List<POStore> strs = PlanHelper.getPhysicalOperators(tezOper.plan,
POStore.class);
for (POStore op : strs) {
@@ -268,7 +262,6 @@ public class TezCompiler extends PhyPlan
parentOper.outEdges.put(entry.getKey(), entry.getValue());
}
}
- // TODO: handle custom partitioner, secondary key on edges
}
private void connectSoftLink() throws PlanException, IOException {
@@ -405,12 +398,18 @@ public class TezCompiler extends PhyPlan
tezPlan.add(newTezOp);
for (TezOperator tezOp : compiledInputs) {
tezOp.setClosed(true);
- handleSplitAndConnect(tezOp, newTezOp);
+ handleSplitAndConnect(tezPlan, tezOp, newTezOp);
}
curTezOp = newTezOp;
}
- private void handleSplitAndConnect(TezOperator from, TezOperator to)
+ private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, TezOperator from, TezOperator to)
+ throws PlanException {
+ return handleSplitAndConnect(tezPlan, from, to, true);
+ }
+
+ private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan,
+ TezOperator from, TezOperator to, boolean addToSplitPlan)
throws PlanException {
// Add edge descriptors from POLocalRearrange in POSplit
// sub-plan to new operators
@@ -420,19 +419,23 @@ public class TezCompiler extends PhyPlan
POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
lr.setOutputKey(to.getOperatorKey().toString());
}
+ TezEdgeDescriptor edge = null;
if (from.isSplitSubPlan()) {
- // Set inputs to null as POSplit will attach input to roots
- for (PhysicalOperator root : from.plan.getRoots()) {
- root.setInputs(null);
- }
TezOperator splitOp = splitsSeen.get(from.getSplitOperatorKey());
- POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
- split.addPlan(from.plan);
- addSubPlanPropertiesToParent(splitOp, curTezOp);
- TezCompilerUtil.connect(tezPlan, splitOp, to);
+ if (addToSplitPlan) {
+ // Set inputs to null as POSplit will attach input to roots
+ for (PhysicalOperator root : from.plan.getRoots()) {
+ root.setInputs(null);
+ }
+ POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
+ split.addPlan(from.plan);
+ addSubPlanPropertiesToParent(splitOp, curTezOp);
+ }
+ edge = TezCompilerUtil.connect(tezPlan, splitOp, to);
} else {
- TezCompilerUtil.connect(tezPlan, from, to);
+ edge = TezCompilerUtil.connect(tezPlan, from, to);
}
+ return edge;
}
private POSplit findPOSplit(TezOperator tezOp, OperatorKey splitKey)
@@ -647,12 +650,13 @@ public class TezCompiler extends PhyPlan
try {
POLocalRearrange lr = localRearrangeFactory.create();
lr.setDistinct(true);
+ lr.setAlias(op.getAlias());
curTezOp.plan.addAsLeaf(lr);
- curTezOp.customPartitioner = op.getCustomPartitioner();
TezOperator lastOp = curTezOp;
// Mark the start of a new TezOperator, connecting the inputs.
blocking();
+ TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
// Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
// with a global variable and a specific DistinctCombiner class. This seems better.
@@ -660,6 +664,7 @@ public class TezCompiler extends PhyPlan
addDistinctPlan(combinePlan, 1);
POLocalRearrangeTez clr = localRearrangeFactory.create();
+ clr.setOutputKey(curTezOp.getOperatorKey().toString());
clr.setDistinct(true);
combinePlan.addAsLeaf(clr);
@@ -745,11 +750,14 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(curTezOp.getOperatorKey().toString());
tezOp.plan.addAsLeaf(lr);
- TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
- inputKeys.add(tezOp.getOperatorKey().toString());
+ TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, tezOp, curTezOp);
+ if (tezOp.getSplitOperatorKey() == null) {
+ inputKeys.add(tezOp.getSplitOperatorKey().toString());
+ } else {
+ inputKeys.add(tezOp.getOperatorKey().toString());
+ }
// Configure broadcast edges for replicated tables
- TezEdgeDescriptor edge = curTezOp.inEdges.get(tezOp.getOperatorKey());
edge.dataMovementType = DataMovementType.BROADCAST;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
@@ -818,6 +826,7 @@ public class TezCompiler extends PhyPlan
// Need to add POLocalRearrange to the end of the last tezOp before we shuffle.
POLocalRearrange lr = localRearrangeFactory.create();
+ lr.setAlias(op.getAlias());
curTezOp.plan.addAsLeaf(lr);
// Mark the start of a new TezOperator, connecting the inputs.
@@ -833,11 +842,14 @@ public class TezCompiler extends PhyPlan
// Then add a POPackage and a POForEach to the start of the new tezOp.
POPackage pkg = getPackage(1, DataType.TUPLE);
POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
+ pkg.setAlias(op.getAlias());
+ forEach.setAlias(op.getAlias());
curTezOp.plan.add(pkg);
curTezOp.plan.addAsLeaf(forEach);
if (!pigContext.inIllustrator) {
POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ limitCopy.setAlias(op.getAlias());
limitCopy.setLimit(op.getLimit());
limitCopy.setLimitPlan(op.getLimitPlan());
curTezOp.plan.addAsLeaf(limitCopy);
@@ -889,7 +901,7 @@ public class TezCompiler extends PhyPlan
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
try {
blocking();
- curTezOp.customPartitioner = op.getCustomPartitioner();
+ TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
phyToTezOpMap.put(op, curTezOp);
} catch (Exception e) {
int errCode = 2034;
@@ -1179,6 +1191,7 @@ public class TezCompiler extends PhyPlan
@Override
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+ //TODO: handle split and connect
try {
// LR that transfers loaded input to partition vertex
POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
@@ -1939,21 +1952,18 @@ public class TezCompiler extends PhyPlan
Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
- lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
- lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
-
- TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
- TezEdgeDescriptor edge = sortOpers[0].inEdges.get(prevOper.getOperatorKey());
+ TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
// TODO: Convert to unsorted shuffle after TEZ-661
// edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
// edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
edge.partitionerClass = RoundRobinPartitioner.class;
- TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
+ handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
+ lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+ lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
- TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
- edge = sortOpers[0].inEdges.get(quantJobParallelismPair.first.getOperatorKey());
+ edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
edge.dataMovementType = DataMovementType.BROADCAST;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
@@ -1961,8 +1971,7 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
sortOpers[0].sampleOperator = quantJobParallelismPair.first;
- TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
- edge = sortOpers[1].inEdges.get(sortOpers[0].getOperatorKey());
+ edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
edge.partitionerClass = WeightedRangePartitionerTez.class;
curTezOp = sortOpers[1];
@@ -2052,6 +2061,7 @@ public class TezCompiler extends PhyPlan
// before we broadcast.
for (int i = 0; i < compiledInputs.length; i++) {
POLocalRearrangeTez lr = localRearrangeFactory.create(i, LocalRearrangeType.STAR);
+ lr.setAlias(op.getAlias());
lr.setUnion(true);
compiledInputs[i].plan.addAsLeaf(lr);
}
@@ -2062,6 +2072,7 @@ public class TezCompiler extends PhyPlan
// Then add a POPackage to the start of the new tezOp.
POPackage pkg = getPackage(compiledInputs.length, DataType.TUPLE);
+ pkg.setAlias(op.getAlias());
curTezOp.markUnion();
curTezOp.plan.add(pkg);
// TODO: Union should use OnFileUnorderedKVOutput instead of
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java Thu Jan 23 01:06:21 2014
@@ -1,5 +1,6 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -11,6 +12,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -19,6 +21,9 @@ import com.google.common.collect.Lists;
public class TezCompilerUtil {
+ private TezCompilerUtil() {
+ }
+
// simpleConnectTwoVertex is a utility to end a vertex equivalent to map and start vertex equivalent to
// reduce in a tez operator:
// 1. op1 is open
@@ -64,11 +69,13 @@ public class TezCompilerUtil {
connect(tezPlan, op1, op2);
}
- static public void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
+ static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
plan.connect(from, to);
// Add edge descriptors to old and new operators
- to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
- from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
+ TezEdgeDescriptor edge = new TezEdgeDescriptor();
+ to.inEdges.put(from.getOperatorKey(), edge);
+ from.outEdges.put(to.getOperatorKey(), edge);
+ return edge;
}
static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
@@ -103,4 +110,12 @@ public class TezCompilerUtil {
st.setIsTmpStore(true);
return st;
}
+
+ static public void setCustomPartitioner(String customPartitioner, TezOperator tezOp) throws IOException {
+ if (customPartitioner != null) {
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ edge.partitionerClass = PigContext.resolveClassName(customPartitioner);
+ }
+ }
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 23 01:06:21 2014
@@ -28,8 +28,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
@@ -37,13 +35,9 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
@@ -53,6 +47,8 @@ import org.apache.pig.backend.hadoop.HDa
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
@@ -66,8 +62,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigIntRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigLongRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -193,9 +191,10 @@ public class TezDagBuilder extends TezOp
for (POLocalRearrangeTez lr : lrs) {
if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
- setIntermediateInputKeyValue(keyType, conf, to.isSkewedJoin());
- setIntermediateOutputKeyValue(keyType, conf, to.isSkewedJoin());
- conf.set("pig.reduce.key.type", Byte.toString(keyType));
+ setIntermediateInputKeyValue(keyType, conf, to);
+ setIntermediateOutputKeyValue(keyType, conf, to);
+ // In case of secondary key sort, main key type is the actual key type
+ conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType()));
break;
}
}
@@ -203,28 +202,42 @@ public class TezDagBuilder extends TezOp
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- if (edge.partitionerClass != null) {
- conf.setClass("mapreduce.job.partitioner.class",
- edge.partitionerClass, Partitioner.class);
- }
-
if(from.isGlobalSort() || from.isLimitAfterSort()){
if (from.isGlobalSort()) {
- FileSystem fs = FileSystem.get(globalConf);
- Path quantFilePath = new Path(from.getQuantFile() + "/part-r-00000");
- FileStatus fstat = fs.getFileStatus(quantFilePath);
- LocalResource quantFileResource = LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
- LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION,
- fstat.getLen(),
- fstat.getModificationTime());
- localResources.put(quantFilePath.getName(), quantFileResource);
conf.set("pig.sortOrder",
ObjectSerializer.serialize(from.getSortOrder()));
}
}
+ if (edge.isUseSecondaryKey()) {
+ conf.set("pig.secondarySortOrder",
+ ObjectSerializer.serialize(edge.getSecondarySortOrder()));
+ conf.set(org.apache.hadoop.mapreduce.MRJobConfig.PARTITIONER_CLASS_ATTR,
+ SecondaryKeyPartitioner.class.getName());
+ // In MR - job.setSortComparatorClass() or MRJobConfig.KEY_COMPARATOR
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+ PigSecondaryKeyComparator.class.getName());
+ // In MR - job.setOutputKeyClass() or MRJobConfig.OUTPUT_KEY_CLASS
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, NullableTuple.class.getName());
+ // These needs to be on the vertex as well for POShuffleTezLoad to pick it up.
+ // Tez framework also expects this to be per vertex and not edge. IFile.java picks
+ // up keyClass and valueClass from vertex config. TODO - check with Tez folks
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ PigSecondaryKeyComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, NullableTuple.class.getName());
+ // In MR - job.setGroupingComparatorClass() or MRJobConfig.GROUP_COMPARATOR_CLASS
+ // TODO: Check why tez-mapreduce ReduceProcessor use two different tez
+ // settings for the same MRJobConfig.GROUP_COMPARATOR_CLASS and use only one
+ conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigSecondaryKeyGroupComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+ PigSecondaryKeyGroupComparator.class.getName());
+ }
+
+ if (edge.partitionerClass != null) {
+ conf.set(org.apache.hadoop.mapreduce.MRJobConfig.PARTITIONER_CLASS_ATTR,
+ edge.partitionerClass.getName());
+ }
+
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -235,11 +248,11 @@ public class TezDagBuilder extends TezOp
private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
Configuration conf) throws IOException {
POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
- setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp.isSkewedJoin());
+ setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp);
POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
.getLeaves().get(0);
- setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp.isSkewedJoin());
+ setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp);
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
combinePlan, pkgTezOp, combPack);
@@ -324,8 +337,7 @@ public class TezDagBuilder extends TezOp
byte keyType = pack.getPkgr().getKeyType();
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
- payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
- setIntermediateInputKeyValue(keyType, payloadConf, tezOp.isSkewedJoin());
+ setIntermediateInputKeyValue(keyType, payloadConf, tezOp);
POShuffleTezLoad newPack;
if (tezOp.isUnion()) {
newPack = new POUnionTezLoad(pack);
@@ -355,7 +367,7 @@ public class TezDagBuilder extends TezOp
}
setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
- tezOp.isSkewedJoin());
+ tezOp);
}
payloadConf.setClass("mapreduce.outputformat.class",
@@ -601,28 +613,32 @@ public class TezDagBuilder extends TezOp
}
@SuppressWarnings("rawtypes")
- private void setIntermediateInputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
+ private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException, ExecException {
- Class<? extends WritableComparable> keyClass = HDataType
- .getWritableComparableTypes(keyType).getClass();
- if (isSkewedJoin) {
+ if (tezOp.isUseSecondaryKey()) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ NullableTuple.class.getName());
+ } else if (tezOp.isSkewedJoin()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
NullablePartitionWritable.class.getName());
} else {
+ Class<? extends WritableComparable> keyClass = HDataType
+ .getWritableComparableTypes(keyType).getClass();
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
keyClass.getName());
+
}
+ selectInputComparator(conf, keyType, tezOp);
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
NullableTuple.class.getName());
- selectInputComparator(conf, keyType, isSkewedJoin);
}
@SuppressWarnings("rawtypes")
- private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
+ private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException, ExecException {
Class<? extends WritableComparable> keyClass = HDataType
.getWritableComparableTypes(keyType).getClass();
- if (isSkewedJoin) {
+ if (tezOp.isSkewedJoin()) {
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
NullablePartitionWritable.class.getName());
} else {
@@ -633,17 +649,13 @@ public class TezDagBuilder extends TezOp
NullableTuple.class.getName());
conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
- selectOutputComparator(keyType, conf, isSkewedJoin);
+ selectOutputComparator(keyType, conf, tezOp);
}
- static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean isSkewedJoin)
+ static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
- if (isSkewedJoin) {
- return JobControlCompiler.PigGroupingPartitionWritableComparator.class;
- }
-
switch (keyType) {
case DataType.BOOLEAN:
return PigBooleanRawComparator.class;
@@ -695,19 +707,47 @@ public class TezDagBuilder extends TezOp
}
}
- void selectInputComparator(Configuration conf, byte keyType, boolean isSkewedJoin)
+ void selectInputComparator(Configuration conf, byte keyType, TezOperator tezOp)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
- conf.setClass(
- TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
- comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
+ // TODO: Group comparators as in JobControlCompiler
+ if (tezOp.isUseSecondaryKey()) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ PigSecondaryKeyComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
+ PigSecondaryKeyGroupComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+ PigSecondaryKeyGroupComparator.class.getName());
+ } else {
+ if (tezOp.isSkewedJoin()) {
+ // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
+ // What should be TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS if same as MR?
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ PigGroupingPartitionWritableComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
+ PigGroupingPartitionWritableComparator.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+ PigGroupingPartitionWritableComparator.class.getName());
+ } else {
+ conf.setClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ comparatorForKeyType(keyType), RawComparator.class);
+ }
+ }
}
- void selectOutputComparator(byte keyType, Configuration conf, boolean isSkewedJoin)
+ void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
throws JobCreationException {
// TODO: Handle sorting like in JobControlCompiler
- conf.setClass(
- TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
- comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
+ if (tezOp.isSkewedJoin()) {
+ // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
+ // What should be TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS if same as MR?
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+ PigGroupingPartitionWritableComparator.class.getName());
+ } else {
+ conf.setClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+ comparatorForKeyType(keyType), RawComparator.class);
+ }
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Thu Jan 23 01:06:21 2014
@@ -34,11 +34,19 @@ public class TezEdgeDescriptor {
public String inputClassName;
public String outputClassName;
- public Class<? extends Partitioner> partitionerClass;
+
public SchedulingType schedulingType;
public DataSourceType dataSourceType;
public DataMovementType dataMovementType;
+ public Class<? extends Partitioner> partitionerClass;
+
+ // If true, we will use secondary key sort in the job
+ private boolean useSecondaryKey = false;
+
+ // Sort order for secondary keys;
+ private boolean[] secondarySortOrder;
+
public TezEdgeDescriptor() {
combinePlan = new PhysicalPlan();
@@ -50,4 +58,25 @@ public class TezEdgeDescriptor {
dataSourceType = DataSourceType.PERSISTED;
dataMovementType = DataMovementType.SCATTER_GATHER;
}
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public boolean[] getSecondarySortOrder() {
+ return secondarySortOrder;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ if(null == secondarySortOrder) return;
+ this.secondarySortOrder = new boolean[secondarySortOrder.length];
+ for(int i = 0; i < secondarySortOrder.length; ++i) {
+ this.secondarySortOrder[i] = secondarySortOrder[i];
+ }
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Thu Jan 23 01:06:21 2014
@@ -150,7 +150,7 @@ public class TezLauncher extends Launche
throws PlanException, IOException, VisitorException {
TezCompiler comp = new TezCompiler(php, pc);
TezOperPlan tezPlan = comp.compile();
- Boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
+ boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.PROP_NO_COMBINER, "false"));
// Run CombinerOptimizer on Tez plan
@@ -162,6 +162,15 @@ public class TezLauncher extends Launche
co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
}
+ // Run optimizer to make use of secondary sort key when possible for nested foreach
+ // order by and distinct. Should be done before AccumulatorOptimizer
+ boolean noSecKeySort = Boolean.parseBoolean(pc.getProperties().getProperty(
+ PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "false"));
+ if (!pc.inIllustrator && !noSecKeySort) {
+ SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
+ skOptimizer.visit();
+ }
+
// Run AccumulatorOptimizer on Tez plan
boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.OPT_ACCUMULATOR, "true"));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan 23 01:06:21 2014
@@ -59,9 +59,6 @@ public class TezOperator extends Operato
int requestedMemory = 1024;
int requestedCpu = 1;
- // Name of the Custom Partitioner used
- String customPartitioner = null;
-
// Presence indicates that this TezOper is sub-plan of a POSplit.
// Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
private OperatorKey splitOperatorKey = null;
@@ -79,9 +76,6 @@ public class TezOperator extends Operato
//Indicates if this job is an order by job
boolean globalSort = false;
- //The quantiles file name if globalSort is true
- String quantFile;
-
//The sort order of the columns;
//asc is true and desc is false
boolean[] sortOrder;
@@ -100,6 +94,9 @@ public class TezOperator extends Operato
// If not null, need to collect sample sent from predecessor
TezOperator sampleOperator = null;
+ // If true, we will use secondary key sort in the job
+ private boolean useSecondaryKey = false;
+
// Types of blocking operators. For now, we only support the following ones.
private static enum OPER_FEATURE {
NONE,
@@ -211,6 +208,14 @@ public class TezOperator extends Operato
feature = OPER_FEATURE.SAMPLER;
}
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
@Override
public String name() {
String udfStr = getUDFsAsStr();
@@ -261,14 +266,6 @@ public class TezOperator extends Operato
return segmentBelow;
}
- public String getQuantFile() {
- return quantFile;
- }
-
- public void setQuantFile(String quantFile) {
- this.quantFile = quantFile;
- }
-
public void setSortOrder(boolean[] sortOrder) {
if(null == sortOrder) return;
this.sortOrder = new boolean[sortOrder.length];
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Thu Jan 23 01:06:21 2014
@@ -54,6 +54,7 @@ public class TezPrinter extends TezOpPla
mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
if (tezOper.inEdges.size() > 0) {
for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
+ //TODO: Print other edge properties like custom partitioner
if (!inEdge.getValue().combinePlan.isEmpty()) {
mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">");
PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Thu Jan 23 01:06:21 2014
@@ -26,6 +26,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
@@ -42,10 +43,14 @@ import org.apache.pig.impl.util.Pair;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+@InterfaceAudience.Private
public class CombinerOptimizerUtil {
private static final String DISTINCT_UDF_CLASSNAME = org.apache.pig.builtin.Distinct.class.getName();
private static final Log LOG = LogFactory.getLog(CombinerOptimizerUtil.class);
+ private CombinerOptimizerUtil() {
+ }
+
/**
* Algebraic functions and distinct in nested plan of a foreach are
* partially computed in the map and combine phase. A new foreach statement