You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/29 00:58:01 UTC
svn commit: r1546477 [1/2] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relati...
Author: cheolsoo
Date: Thu Nov 28 23:58:00 2013
New Revision: 1546477
URL: http://svn.apache.org/r1546477
Log:
PIG-3527: Allow PigProcessor to handle multiple inputs (mwagner via cheolsoo)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
Removed:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/FileInputHandler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/InputHandler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ShuffledInputHandler.java
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/branches/tez/src/org/apache/pig/data/BinInterSedes.java
pig/branches/tez/src/org/apache/pig/data/DefaultTuple.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 28 23:58:00 2013
@@ -2208,8 +2208,7 @@ public class MRCompiler extends PhyPlanV
POPackage pkg = new POPackage(new OperatorKey(scope,
nig.getNextNodeId(scope)));
LitePackager pkgr = new LitePackager();
- pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
- : keyType);
+ pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
pkg.setPkgr(pkgr);
pkg.setNumInps(1);
mro.reducePlan.add(pkg);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java Thu Nov 28 23:58:00 2013
@@ -46,14 +46,8 @@ public class PigBigDecimalRawComparator
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got "
- + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder "
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java Thu Nov 28 23:58:00 2013
@@ -46,14 +46,8 @@ public class PigBigIntegerRawComparator
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got "
- + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder "
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java Thu Nov 28 23:58:00 2013
@@ -40,14 +40,8 @@ public class PigBooleanRawComparator ext
mWrappedComp = new BooleanWritable.Comparator();
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
@@ -76,7 +70,7 @@ public class PigBooleanRawComparator ext
if (b1[s1] == 0 && b2[s2] == 0) {
byte byte1 = b1[s1 + 1];
byte byte2 = b2[s2 + 1];
- rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
+ rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Nov 28 23:58:00 2013
@@ -42,14 +42,8 @@ public class PigBytesRawComparator exten
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java Thu Nov 28 23:58:00 2013
@@ -45,14 +45,8 @@ public class PigDateTimeRawComparator ex
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got "
- + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf
.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder "
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Thu Nov 28 23:58:00 2013
@@ -43,14 +43,8 @@ public class PigDoubleRawComparator exte
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Thu Nov 28 23:58:00 2013
@@ -43,14 +43,8 @@ public class PigFloatRawComparator exten
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Thu Nov 28 23:58:00 2013
@@ -40,14 +40,8 @@ public class PigIntRawComparator extends
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
@@ -76,7 +70,7 @@ public class PigIntRawComparator extends
if (b1[s1] == 0 && b2[s2] == 0) {
int int1 = readInt(b1, s1 + 1);
int int2 = readInt(b2, s2 + 1);
- rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
+ rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
} else {
// For sorting purposes two nulls are equal.
if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Thu Nov 28 23:58:00 2013
@@ -45,14 +45,8 @@ public class PigLongRawComparator extend
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " +
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Thu Nov 28 23:58:00 2013
@@ -34,11 +34,6 @@ public class PigSecondaryKeyComparator e
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
Class<? extends TupleRawComparator> mComparatorClass = TupleFactory.getInstance().tupleRawComparatorClass();
mComparator = mComparatorClass.newInstance();
@@ -47,7 +42,7 @@ public class PigSecondaryKeyComparator e
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
- mComparator.setConf(jconf);
+ mComparator.setConf(conf);
}
protected PigSecondaryKeyComparator() {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Thu Nov 28 23:58:00 2013
@@ -44,14 +44,8 @@ public class PigTextRawComparator extend
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " +
- conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf)conf;
try {
- mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+ mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
"pig.sortOrder"));
} catch (IOException ioe) {
String msg = "Unable to deserialize pig.sortOrder";
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Thu Nov 28 23:58:00 2013
@@ -47,13 +47,8 @@ public class PigTupleDefaultRawComparato
}
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
@@ -75,7 +70,7 @@ public class PigTupleDefaultRawComparato
public boolean hasComparedTupleNull() {
return mHasNullField;
}
-
+
private static final BinInterSedes bis = new BinInterSedes();
/**
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Thu Nov 28 23:58:00 2013
@@ -47,13 +47,8 @@ public class PigTupleSortComparator exte
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
@@ -86,7 +81,7 @@ public class PigTupleSortComparator exte
throw new RuntimeException(e);
}
}
- ((Configurable)mComparator).setConf(jconf);
+ ((Configurable)mComparator).setConf(conf);
}
@Override
@@ -165,4 +160,4 @@ public class PigTupleSortComparator exte
return 0;
}
}
-}
\ No newline at end of file
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java?rev=1546477&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java Thu Nov 28 23:58:00 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
+ */
+public class POSimpleTezLoad extends POLoad implements TezLoad {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private String inputKey;
+ private MRInput input;
+ private KeyValueReader reader;
+
+ private Result res;
+
+ public POSimpleTezLoad(OperatorKey k) {
+ super(k);
+ res = new Result();
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf)
+ throws ExecException {
+ LogicalInput logInput = inputs.get(inputKey);
+ if (logInput == null || !(logInput instanceof MRInput)) {
+ throw new ExecException("POSimpleTezLoad only accepts MRInputs");
+ }
+ input = (MRInput) logInput;
+ try {
+ reader = input.getReader();
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ if (!reader.next()) {
+ res.result = null;
+ res.returnStatus = POStatus.STATUS_EOP;
+ } else {
+ Tuple next = (Tuple) reader.getCurrentValue();
+ res.result = next;
+ res.returnStatus = POStatus.STATUS_OK;
+ }
+ return res;
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 28 23:58:00 2013
@@ -102,7 +102,11 @@ public class CombinerPackager extends Pa
@Override
public Result getNext() throws ExecException {
- //Create numInputs bags
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // Create numInputs bags
Object[] fields = new Object[mBags.length];
for (int i = 0; i < mBags.length; i++) {
if (mBags[i]) fields[i] = createDataBag(numBags);
@@ -131,6 +135,8 @@ public class CombinerPackager extends Pa
}
}
+ detachInput();
+
// The successor of the POCombinerPackage as of
// now SHOULD be a POForeach which has been adjusted
// to look for its inputs by projecting from the corresponding
@@ -147,7 +153,7 @@ public class CombinerPackager extends Pa
}
@Override
- protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+ public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
throws ExecException {
return (Tuple) ntup.getValueAsPigType();
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Thu Nov 28 23:58:00 2013
@@ -216,7 +216,7 @@ public class JoinPackager extends Packag
}
@Override
- void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
super.attachInput(key, bags, readOnce);
this.newKey = true;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Thu Nov 28 23:58:00 2013
@@ -112,6 +112,10 @@ public class LitePackager extends Packag
*/
@Override
public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
Tuple res;
//Construct the output tuple by appending
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 28 23:58:00 2013
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -128,6 +129,9 @@ public class MultiQueryPackager extends
*/
@Override
public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
byte origIndex = myKey.getIndex();
@@ -159,6 +163,7 @@ public class MultiQueryPackager extends
Result res = pkgr.getNext();
pkgr.detachInput();
+ detachInput();
Tuple tuple = (Tuple)res.result;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 28 23:58:00 2013
@@ -70,12 +70,12 @@ public class POPackage extends PhysicalO
transient Iterator<NullableTuple> tupIter;
//The key being worked on
- Object key;
+ protected Object key;
//The number of inputs to this
//co-group. 0 indicates a distinct, which means there will only be a
//key, no value.
- int numInputs;
+ protected int numInputs;
// A mapping of input index to key information got from LORearrange
// for that index. The Key information is a pair of boolean, Map.
@@ -152,7 +152,7 @@ public class POPackage extends PhysicalO
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
try {
tupIter = inp;
- key = pkgr.getKey(k.getValueAsPigType());
+ key = pkgr.getKey(k);
inputAttached = true;
} catch (Exception e) {
throw new RuntimeException(
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 28 23:58:00 2013
@@ -12,6 +12,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.Pair;
public class Packager implements Serializable, Cloneable {
@@ -64,18 +65,18 @@ public class Packager implements Seriali
private PackageType pkgType;
protected static final BagFactory mBagFactory = BagFactory.getInstance();
- protected static final TupleFactory mTupleFactory = TupleFactory
- .getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
- Object getKey(Object key) throws ExecException {
+ public Object getKey(PigNullableWritable key) throws ExecException {
+ Object keyObject = key.getValueAsPigType();
if (useSecondaryKey) {
- return ((Tuple) key).get(0);
+ return ((Tuple) keyObject).get(0);
} else {
- return key;
+ return keyObject;
}
}
- void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
this.key = key;
this.bags = bags;
@@ -83,6 +84,9 @@ public class Packager implements Seriali
}
public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
Tuple res;
if (isDistinct()) {
@@ -117,21 +121,19 @@ public class Packager implements Seriali
}
}
}
+ detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_OK;
- // if (!isAccumulative())
- // r.result = illustratorMarkup(null, res, 0);
- // else
r.result = res;
return r;
}
- void detachInput() {
+ public void detachInput() {
key = null;
bags = null;
}
- protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+ public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
throws ExecException {
// Need to make a copy of the value, as hadoop uses the same ntup
// to represent each value.
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1546477&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Nov 28 23:58:00 2013
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+public class POShuffleTezLoad extends POPackage implements TezLoad {
+
+ private static final long serialVersionUID = 1L;
+
+ List<String> inputKeys = new ArrayList<String>();
+
+ List<ShuffledMergedInput> inputs = new ArrayList<ShuffledMergedInput>();
+ List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
+ List<Boolean> finished = new ArrayList<Boolean>();
+
+ private boolean[] readOnce;
+
+ Result res;
+
+ protected static final TupleFactory tf = TupleFactory.getInstance();
+
+ WritableComparator comparator = null;
+
+ public POShuffleTezLoad(OperatorKey k, POPackage pack) {
+ super(k);
+ setPkgr(pack.getPkgr());
+ this.setNumInps(pack.getNumInps());
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+ throws ExecException {
+ readOnce = new boolean[numInputs];
+ for (int i = 0; i < numInputs; i++) {
+ readOnce[i] = false;
+ }
+
+ try {
+ comparator = ReflectionUtils.newInstance(
+ TezDagBuilder.comparatorForKeyType(pkgr.getKeyType()), conf);
+ } catch (JobCreationException e) {
+ throw new ExecException(e);
+ }
+ try {
+ // TODO: Only take the inputs which are actually specified.
+ for (LogicalInput input : inputs.values()) {
+ ShuffledMergedInput smInput = (ShuffledMergedInput) input;
+ this.inputs.add(smInput);
+ this.readers.add(smInput.getReader());
+ }
+
+ for (int i = 0; i < numInputs; i++) {
+ finished.add(!readers.get(i).next());
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result res = pkgr.getNext();
+ while (res.returnStatus == POStatus.STATUS_EOP) {
+ PigNullableWritable minimum = null;
+ boolean newData = false;
+ try {
+ for (int i = 0; i < numInputs; i++) {
+ if (!finished.get(i)) {
+ newData = true;
+ PigNullableWritable current = (PigNullableWritable) readers
+ .get(i).getCurrentKey();
+ if (minimum == null
+ || comparator.compare(minimum, current) > 0) {
+ minimum = current;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+
+ if (!newData) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ key = pkgr.getKey(minimum);
+
+ DataBag[] bags = new DataBag[numInputs];
+
+ try {
+ for (int i = 0; i < numInputs; i++) {
+ if (!finished.get(i)) {
+ PigNullableWritable current = (PigNullableWritable) readers
+ .get(i).getCurrentKey();
+ if (comparator.compare(minimum, current) == 0) {
+ DataBag bag = new InternalCachedBag(numInputs);
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(key, nTup, index);
+ bag.add(tup);
+ }
+ finished.set(i, !readers.get(i).next());
+ bags[i] = bag;
+ } else {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
+ } else {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ pkgr.attachInput(key, bags, readOnce);
+ res = pkgr.getNext();
+ }
+ return res;
+ }
+
+ public void setInputKeys(List<String> inputKeys) {
+ this.inputKeys = inputKeys;
+ }
+
+ public void addInputKey(String inputKey) {
+ inputKeys.add(inputKey);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Nov 28 23:58:00 2013
@@ -18,9 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,12 +31,14 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POSimpleTezLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -70,8 +72,6 @@ public class PigProcessor implements Log
private boolean shuffle;
private byte keyType;
- private InputHandler input;
-
private Configuration conf;
@Override
@@ -81,8 +81,6 @@ public class PigProcessor implements Log
conf = TezUtils.createConfFromUserPayload(payload);
PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
- input = createInputHandler(conf);
-
UDFContext.getUDFContext().addJobConf(conf);
UDFContext.getUDFContext().deserialize();
@@ -106,7 +104,7 @@ public class PigProcessor implements Log
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
- input.initialize(conf, inputs);
+ initializeInputs(inputs);
initializeOutputs(outputs);
@@ -124,34 +122,24 @@ public class PigProcessor implements Log
}
}
- while (input.next()){
- Tuple inputTuple = input.getCurrentTuple();
- if (execPlan.isEmpty()) {
- writeResult(inputTuple);
- continue;
- }
-
- for (PhysicalOperator root : roots) {
- root.attachInput(inputTuple);
- }
-
- runPipeline(leaf);
- }
+ runPipeline(leaf);
for (MROutput fileOutput : fileOutputs){
fileOutput.commit();
}
}
- private InputHandler createInputHandler(Configuration conf) throws PigException {
- Class<? extends InputHandler> inputClass;
- try {
- inputClass = (Class<? extends InputHandler>)
- Class.forName(conf.get("pig.input.handler.class"));
- Constructor<? extends InputHandler> constructor = inputClass.getConstructor();
- return constructor.newInstance();
- } catch (Exception e) {
- throw new PigException("Could not instantiate input handler", e);
+ private void initializeInputs(Map<String, LogicalInput> inputs)
+ throws IOException {
+ //getPhysicalOperators only accept C extends PhysicalOperator, so we can't change it to look for TezLoad
+ // TODO: Change that.
+ LinkedList<POSimpleTezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, POSimpleTezLoad.class);
+ for (POSimpleTezLoad tezLd : tezLds){
+ tezLd.attachInputs(inputs, conf);
+ }
+ LinkedList<POShuffleTezLoad> shuffles = PlanHelper.getPhysicalOperators(execPlan, POShuffleTezLoad.class);
+ for (POShuffleTezLoad shuffle : shuffles){
+ shuffle.attachInputs(inputs, conf);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 28 23:58:00 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -47,34 +48,25 @@ import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigLongWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBooleanRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBytesRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigDateTimeRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigDoubleRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFloatRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigIntRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigLongRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POSimpleTezLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -89,6 +81,7 @@ import org.apache.pig.impl.io.FileLocali
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -119,12 +112,20 @@ public class TezDagBuilder extends TezOp
private DAG dag;
private Map<String, LocalResource> localResources;
private PigContext pc;
+ private Configuration globalConf;
- public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) {
+ private String scope;
+ private NodeIdGenerator nig;
+
+ public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
+ Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pc;
+ this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
this.localResources = localResources;
this.dag = dag;
+ this.scope = plan.getRoots().get(0).getOperatorKey().getScope();
+ this.nig = NodeIdGenerator.getGenerator();
}
@Override
@@ -135,11 +136,12 @@ public class TezDagBuilder extends TezOp
to = newVertex(tezOp);
dag.addVertex(to);
} catch (IOException e) {
- throw new VisitorException("Cannot create vertex for " + tezOp.name(), e);
+ throw new VisitorException("Cannot create vertex for "
+ + tezOp.name(), e);
}
// Connect the new vertex with predecessor vertices
- TezOperPlan tezPlan = getPlan();
+ TezOperPlan tezPlan = getPlan();
List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
if (predecessors != null) {
for (TezOperator predecessor : predecessors) {
@@ -150,8 +152,8 @@ public class TezDagBuilder extends TezOp
try {
prop = newEdge(predecessor, tezOp);
} catch (IOException e) {
- throw new VisitorException("Cannot create edge from " +
- predecessor.name() + " to " + tezOp.name(), e);
+ throw new VisitorException("Cannot create edge from "
+ + predecessor.name() + " to " + tezOp.name(), e);
}
Edge edge = new Edge(from, to, prop);
dag.addEdge(edge);
@@ -161,12 +163,14 @@ public class TezDagBuilder extends TezOp
/**
* Return EdgeProperty that connects two vertices.
+ *
* @param from
* @param to
* @return EdgeProperty
* @throws IOException
*/
- private EdgeProperty newEdge(TezOperator from, TezOperator to) throws IOException {
+ private EdgeProperty newEdge(TezOperator from, TezOperator to)
+ throws IOException {
TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
PhysicalPlan combinePlan = edge.combinePlan;
@@ -180,107 +184,127 @@ public class TezDagBuilder extends TezOp
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
}
- return new EdgeProperty(
- edge.dataMovementType,
- edge.dataSourceType,
- edge.schedulingType,
- out, in);
+ return new EdgeProperty(edge.dataMovementType, edge.dataSourceType,
+ edge.schedulingType, out, in);
}
private void addCombiner(PhysicalPlan combinePlan, Configuration conf) throws IOException {
POPackage combPack = (POPackage)combinePlan.getRoots().get(0);
setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
- POLocalRearrange combRearrange = (POLocalRearrange)combinePlan.getLeaves().get(0);
+ POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
+ .getLeaves().get(0);
setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
- LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(combinePlan, combPack);
+ LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+ combinePlan, combPack);
lrDiscoverer.visit();
combinePlan.remove(combPack);
- conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
- conf.set(MRJobConfig.COMBINE_CLASS_ATTR, PigCombiner.Combine.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS,
+ MRCombiner.class.getName());
+ conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+ PigCombiner.Combine.class.getName());
conf.setBoolean("mapred.mapper.new-api", true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
- conf.set("pig.map.keytype", ObjectSerializer.serialize(new byte[] {combRearrange.getKeyType()}));
+ conf.set("pig.map.keytype", ObjectSerializer
+ .serialize(new byte[] { combRearrange.getKeyType() }));
}
private Vertex newVertex(TezOperator tezOp) throws IOException {
- ProcessorDescriptor procDesc = new ProcessorDescriptor(tezOp.getProcessorName());
+ ProcessorDescriptor procDesc = new ProcessorDescriptor(
+ tezOp.getProcessorName());
// Pass physical plans to vertex as user payload.
- Configuration conf = new Configuration();
- // We won't actually use this job, but we need it to talk with the Load Store funcs
+ Configuration payloadConf = new Configuration();
+ // We won't actually use this job, but we need it to talk with the Load
+ // Store funcs
@SuppressWarnings("deprecation")
- Job job = new Job(conf);
+ Job job = new Job(payloadConf);
ArrayList<POStore> storeLocations = new ArrayList<POStore>();
Path tmpLocation = null;
- boolean loads = processLoads(tezOp, conf, job);
- conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-
- conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
- conf.setBoolean("mapred.mapper.new-api", true);
- conf.setClass("mapreduce.inputformat.class", PigInputFormat.class, InputFormat.class);
+ List<POLoad> loads = processLoads(tezOp, payloadConf, job);
+ payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- // We need to remove all the POStores from the exec plan and later add them as outputs of the vertex
- LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+ payloadConf.set("udf.import.list",
+ ObjectSerializer.serialize(PigContext.getPackageImportList()));
+ payloadConf.setBoolean("mapred.mapper.new-api", true);
+ payloadConf.setClass("mapreduce.inputformat.class",
+ PigInputFormat.class, InputFormat.class);
+
+ // We need to remove all the POStores from the exec plan and later add
+ // them as outputs of the vertex
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+ tezOp.plan, POStore.class);
- for (POStore st: stores) {
+ for (POStore st : stores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), job);
}
- if (stores.size() == 1){
+ if (stores.size() == 1) {
POStore st = stores.get(0);
- if(!pc.inIllustrator)
+ if (!pc.inIllustrator)
tezOp.plan.remove(st);
// set out filespecs
String outputPathString = st.getSFile().getFileName();
- if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
- conf.set("pig.streaming.log.dir",
- new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+ if (!outputPathString.contains("://")
+ || outputPathString.startsWith("hdfs://")) {
+ payloadConf.set("pig.streaming.log.dir", new Path(
+ outputPathString, JobControlCompiler.LOG_DIR)
+ .toString());
} else {
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pc).toString();
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
+ .toString();
tmpLocation = new Path(tmpLocationStr);
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+ payloadConf.set("pig.streaming.log.dir", new Path(tmpLocation,
+ JobControlCompiler.LOG_DIR).toString());
}
- conf.set("pig.streaming.task.output.dir", outputPathString);
+ payloadConf.set("pig.streaming.task.output.dir", outputPathString);
} else if (stores.size() > 0) { // multi store case
log.info("Setting up multi store job");
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pc).toString();
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
+ .toString();
tmpLocation = new Path(tmpLocationStr);
- boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+ boolean disableCounter = payloadConf.getBoolean(
+ "pig.disable.counter", false);
if (disableCounter) {
log.info("Disable Pig custom output counters");
}
int idx = 0;
- for (POStore sto: storeLocations) {
+ for (POStore sto : storeLocations) {
sto.setDisableCounter(disableCounter);
sto.setMultiStore(true);
sto.setIndex(idx++);
}
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
- conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+ payloadConf.set("pig.streaming.log.dir", new Path(tmpLocation,
+ JobControlCompiler.LOG_DIR).toString());
+ payloadConf.set("pig.streaming.task.output.dir",
+ tmpLocation.toString());
}
if (!pc.inIllustrator) {
- // Unset inputs for POStore, otherwise, exec plan will be unnecessarily deserialized
- for (POStore st: stores) { st.setInputs(null); st.setParentPlan(null);}
- // We put them in the reduce because PigOutputCommitter checks the ID of the task to see if it's a map, and if not, calls the reduce committers.
- conf.set(JobControlCompiler.PIG_MAP_STORES, ObjectSerializer.serialize(new ArrayList<POStore>()));
- conf.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(stores));
+ // Unset inputs for POStore, otherwise, exec plan will be
+ // unnecessarily deserialized
+ for (POStore st : stores) {
+ st.setInputs(null);
+ st.setParentPlan(null);
+ }
+ // We put them in the reduce because PigOutputCommitter checks the
+ // ID of the task to see if it's a map, and if not, calls the reduce
+ // committers.
+ payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
+ ObjectSerializer.serialize(new ArrayList<POStore>()));
+ payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
+ ObjectSerializer.serialize(stores));
}
// For all shuffle outputs, configure the classes
@@ -290,30 +314,52 @@ public class TezDagBuilder extends TezOp
// different keys.
if (leaves.size() == 1 && leaves.get(0) instanceof POLocalRearrange) {
byte keyType = ((POLocalRearrange)leaves.get(0)).getKeyType();
- setIntermediateOutputKeyValue(keyType, conf);
- conf.set("pig.reduce.key.type", Byte.toString(keyType));
+ setIntermediateOutputKeyValue(keyType, payloadConf);
+ payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
}
// Configure the classes for incoming shuffles to this TezOp
List<PhysicalOperator> roots = tezOp.plan.getRoots();
if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
POPackage pack = (POPackage) roots.get(0);
+
+ List<PhysicalOperator> succsList = tezOp.plan.getSuccessors(pack);
+ if (succsList != null) {
+ succsList = new ArrayList<PhysicalOperator>(succsList);
+ }
byte keyType = pack.getPkgr().getKeyType();
tezOp.plan.remove(pack);
- conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
- conf.set("pig.reduce.key.type", Byte.toString(keyType));
- setIntermediateInputKeyValue(keyType, conf);
- conf.setClass("pig.input.handler.class", ShuffledInputHandler.class, InputHandler.class);
- conf.set("pig.reduce.key.type", Byte.toString(keyType));
- } else {
- conf.setClass("pig.input.handler.class", FileInputHandler.class, InputHandler.class);
- }
+ payloadConf.set("pig.reduce.package",
+ ObjectSerializer.serialize(pack));
+ payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
+ setIntermediateInputKeyValue(keyType, payloadConf);
+ // TODO: Move POShuffleTezLoad upstream to Physical Plan generation
+ POShuffleTezLoad shuffleLoad = new POShuffleTezLoad(
+ new OperatorKey(scope, nig.getNextNodeId(scope)), pack);
+ tezOp.plan.add(shuffleLoad);
+
+ if (succsList != null) {
+ for (PhysicalOperator succs : succsList) {
+ tezOp.plan.connect(shuffleLoad, succs);
+ }
+ }
- conf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class);
+ @SuppressWarnings("rawtypes")
+ Class<? extends WritableComparable> keyClass = HDataType
+ .getWritableComparableTypes(pack.getPkgr().getKeyType())
+ .getClass();
+ payloadConf.set(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ keyClass.getName());
+ payloadConf.set(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ NullableTuple.class.getName());
+ selectInputComparator(payloadConf, pack.getPkgr().getKeyType());
+ }
if(tezOp.isGlobalSort() || tezOp.isLimitAfterSort()){
if (tezOp.isGlobalSort()) {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(globalConf);
Path quantFilePath = new Path(tezOp.getQuantFile() + "/part-r-00000");
FileStatus fstat = fs.getFileStatus(quantFilePath);
LocalResource quantFileResource = LocalResource.newInstance(
@@ -323,48 +369,66 @@ public class TezDagBuilder extends TezOp
fstat.getLen(),
fstat.getModificationTime());
localResources.put(quantFilePath.getName(), quantFileResource);
- conf.set("pig.quantilesFile", fstat.getPath().toString());
- conf.set("pig.sortOrder",
+ payloadConf.set("pig.quantilesFile", fstat.getPath().toString());
+ payloadConf.set("pig.sortOrder",
ObjectSerializer.serialize(tezOp.getSortOrder()));
- conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class,
+ payloadConf.setClass("mapreduce.job.partitioner.class",
+ WeightedRangePartitioner.class,
Partitioner.class);
}
}
+ payloadConf.setClass("mapreduce.outputformat.class",
+ PigOutputFormat.class, OutputFormat.class);
+
// Serialize the execution plan
- conf.set(PigProcessor.PLAN, ObjectSerializer.serialize(tezOp.plan));
- UDFContext.getUDFContext().serialize(conf);
+ payloadConf.set(PigProcessor.PLAN,
+ ObjectSerializer.serialize(tezOp.plan));
+ UDFContext.getUDFContext().serialize(payloadConf);
// Take our assembled configuration and create a vertex
- byte[] userPayload = TezUtils.createUserPayloadFromConf(conf);
+ byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
- // Can only set parallelism here if the parallelism isn't derived from splits
- int parallelism = !loads ? tezOp.requestedParallelism : -1;
+ // Can only set parallelism here if the parallelism isn't derived from
+ // splits
+ int parallelism = (loads.size() == 0) ? tezOp.requestedParallelism : -1;
Vertex vertex = new Vertex(tezOp.name(), procDesc, parallelism,
Resource.newInstance(tezOp.requestedMemory, tezOp.requestedCpu));
Map<String, String> env = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, env, true);
+ MRHelpers.updateEnvironmentForMRTasks(globalConf, env, true);
vertex.setTaskEnvironment(env);
vertex.setTaskLocalResources(localResources);
// This could also be reduce, but we need to choose one
- // TODO: Create new or use existing settings that are specifically for Tez.
- vertex.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
-
- // Right now there can only be one of each of these. Will need to be more generic when there can be more.
- if (loads){
- vertex.addInput("PigInput", new InputDescriptor(MRInput.class.getName()).setUserPayload(MRHelpers.createMRInputPayload(userPayload, null)), MRInputAMSplitGenerator.class);
- }
- if (!stores.isEmpty()){
- vertex.addOutput("PigOutput", new OutputDescriptor(MROutput.class.getName()));
+ // TODO: Create new or use existing settings that are specifically for
+ // Tez.
+ vertex.setJavaOpts(MRHelpers.getMapJavaOpts(globalConf));
+
+ // Right now there can only be one of each of these. Will need to be
+ // more generic when there can be more.
+ for (POLoad ld : loads) {
+
+ // TODO: These should get the globalConf, or a merged version that
+ // keeps settings like pig.maxCombinedSplitSize
+ vertex.addInput(ld.getOperatorKey().toString(),
+ new InputDescriptor(MRInput.class.getName())
+ .setUserPayload(MRHelpers.createMRInputPayload(
+ userPayload, null)),
+ MRInputAMSplitGenerator.class);
+ }
+ if (!stores.isEmpty()) {
+ vertex.addOutput("PigOutput",
+ new OutputDescriptor(MROutput.class.getName()));
}
return vertex;
}
/**
- * Do the final configuration of LoadFuncs and store what goes where. This will need to be changed as the inputs get un-bundled
+ * Do the final configuration of LoadFuncs and store what goes where. This
+ * will need to be changed as the inputs get un-bundled
+ *
* @param tezOp
* @param conf
* @param job
@@ -372,32 +436,34 @@ public class TezDagBuilder extends TezOp
* @throws VisitorException
* @throws IOException
*/
- private boolean processLoads(TezOperator tezOp, Configuration conf, Job job)
- throws VisitorException, IOException {
+ private List<POLoad> processLoads(TezOperator tezOp, Configuration conf,
+ Job job) throws VisitorException, IOException {
ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
ArrayList<String> inpSignatureLists = new ArrayList<String>();
ArrayList<Long> inpLimits = new ArrayList<Long>();
- List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan, POLoad.class);
+ List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
+ POLoad.class);
- if(lds!=null && lds.size()>0){
+ if (lds != null && lds.size() > 0) {
for (POLoad ld : lds) {
LoadFunc lf = ld.getLoadFunc();
lf.setLocation(ld.getLFile().getFileName(), job);
- //Store the inp filespecs
+ // Store the inp filespecs
inp.add(ld.getLFile());
}
}
- if(lds!=null && lds.size()>0){
+ if (lds != null && lds.size() > 0) {
for (POLoad ld : lds) {
- //Store the target operators for tuples read
- //from this input
- List<PhysicalOperator> ldSucs = tezOp.plan.getSuccessors(ld);
+ // Store the target operators for tuples read
+ // from this input
+ List<PhysicalOperator> ldSucs = new ArrayList<PhysicalOperator>(
+ tezOp.plan.getSuccessors(ld));
List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
- if(ldSucs!=null){
+ if (ldSucs != null) {
for (PhysicalOperator operator2 : ldSucs) {
ldSucKeys.add(operator2.getOperatorKey());
}
@@ -405,8 +471,18 @@ public class TezDagBuilder extends TezOp
inpTargets.add(ldSucKeys);
inpSignatureLists.add(ld.getSignature());
inpLimits.add(ld.getLimit());
- //Remove the POLoad from the plan
+ // Remove the POLoad from the plan
tezOp.plan.remove(ld);
+ // Now add the input handling operator for the Tez backend
+ // TODO: Move this upstream to the PhysicalPlan generation
+ POSimpleTezLoad tezLoad = new POSimpleTezLoad(new OperatorKey(
+ scope, nig.getNextNodeId(scope)));
+ tezLoad.setInputKey(ld.getOperatorKey().toString());
+ tezOp.plan.add(tezLoad);
+ for (PhysicalOperator sucs : ldSucs) {
+ tezOp.plan.connect(tezLoad, sucs);
+ }
+
}
}
@@ -415,69 +491,69 @@ public class TezDagBuilder extends TezOp
conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
- return (lds.size() > 0);
+ return lds;
}
@SuppressWarnings("rawtypes")
- private void setIntermediateInputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
- Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClass.getName());
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, NullableTuple.class.getName());
- selectInputComparator(keyType, conf);
+ private void setIntermediateInputKeyValue(byte keyType, Configuration conf)
+ throws JobCreationException, ExecException {
+ Class<? extends WritableComparable> keyClass = HDataType
+ .getWritableComparableTypes(keyType).getClass();
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ keyClass.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ NullableTuple.class.getName());
+ selectInputComparator(conf, keyType);
}
@SuppressWarnings("rawtypes")
- private void setIntermediateOutputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
- Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
- conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, NullableTuple.class.getName());
- conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
+ private void setIntermediateOutputKeyValue(byte keyType, Configuration conf)
+ throws JobCreationException, ExecException {
+ Class<? extends WritableComparable> keyClass = HDataType
+ .getWritableComparableTypes(keyType).getClass();
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ keyClass.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+ NullableTuple.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
+ MRPartitioner.class.getName());
selectOutputComparator(keyType, conf);
}
- private void selectInputComparator(byte keyType, Configuration conf) throws JobCreationException {
- //TODO: Handle sorting like in JobControlCompiler
+ static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
+ throws JobCreationException {
+ // TODO: Handle sorting like in JobControlCompiler
switch (keyType) {
case DataType.BOOLEAN:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBooleanWritableComparator.class, RawComparator.class);
- break;
+ return PigBooleanRawComparator.class;
case DataType.INTEGER:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigIntWritableComparator.class, RawComparator.class);
- break;
+ return PigIntRawComparator.class;
case DataType.BIGINTEGER:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBigIntegerWritableComparator.class, RawComparator.class);
- break;
+ return PigBigIntegerRawComparator.class;
case DataType.BIGDECIMAL:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBigDecimalWritableComparator.class, RawComparator.class);
- break;
+ return PigBigDecimalRawComparator.class;
case DataType.LONG:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigLongWritableComparator.class, RawComparator.class);
- break;
+ return PigLongRawComparator.class;
case DataType.FLOAT:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigFloatWritableComparator.class, RawComparator.class);
- break;
+ return PigFloatRawComparator.class;
case DataType.DOUBLE:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDoubleWritableComparator.class, RawComparator.class);
- break;
+ return PigDoubleRawComparator.class;
case DataType.DATETIME:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDateTimeWritableComparator.class, RawComparator.class);
- break;
+ return PigDateTimeRawComparator.class;
case DataType.CHARARRAY:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigCharArrayWritableComparator.class, RawComparator.class);
- break;
+ return PigTextRawComparator.class;
case DataType.BYTEARRAY:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDBAWritableComparator.class, RawComparator.class);
- break;
+ return PigBytesRawComparator.class;
case DataType.MAP:
int errCode = 1068;
@@ -485,8 +561,7 @@ public class TezDagBuilder extends TezOp
throw new JobCreationException(msg, errCode, PigException.INPUT);
case DataType.TUPLE:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigTupleWritableComparator.class, RawComparator.class);
- break;
+ return PigTupleSortComparator.class;
case DataType.BAG:
errCode = 1068;
@@ -500,80 +575,19 @@ public class TezDagBuilder extends TezOp
}
}
- private void selectOutputComparator(byte keyType, Configuration conf) throws JobCreationException {
- //TODO: Handle sorting like in JobControlCompiler
-
- switch (keyType) {
- case DataType.BOOLEAN:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBooleanWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBooleanWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.INTEGER:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigIntWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingIntWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.BIGINTEGER:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBigIntegerWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBigIntegerWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.BIGDECIMAL:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBigDecimalWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBigDecimalWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.LONG:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigLongWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingLongWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.FLOAT:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigFloatWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingFloatWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.DOUBLE:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDoubleWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDoubleWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.DATETIME:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDateTimeWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDateTimeWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.CHARARRAY:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigCharArrayWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingCharArrayWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.BYTEARRAY:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDBAWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDBAWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.MAP:
- int errCode = 1068;
- String msg = "Using Map as key not supported.";
- throw new JobCreationException(msg, errCode, PigException.INPUT);
-
- case DataType.TUPLE:
- conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigTupleWritableComparator.class, RawComparator.class);
- conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingTupleWritableComparator.class, RawComparator.class);
- break;
-
- case DataType.BAG:
- errCode = 1068;
- msg = "Using Bag as key not supported.";
- throw new JobCreationException(msg, errCode, PigException.INPUT);
+ void selectInputComparator(Configuration conf, byte keyType)
+ throws JobCreationException {
+ // TODO: Handle sorting like in JobControlCompiler
+ conf.setClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ comparatorForKeyType(keyType), RawComparator.class);
+ }
- default:
- errCode = 2036;
- msg = "Unhandled key type " + DataType.findTypeName(keyType);
- throw new JobCreationException(msg, errCode, PigException.BUG);
- }
+ void selectOutputComparator(byte keyType, Configuration conf)
+ throws JobCreationException {
+ // TODO: Handle sorting like in JobControlCompiler
+ conf.setClass(
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+ comparatorForKeyType(keyType), RawComparator.class);
}
}
-