You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/12 02:20:22 UTC
svn commit: r1791060 [4/4] - in /pig/branches/spark: ./ bin/
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Apr 12 02:20:20 2017
@@ -42,28 +42,28 @@ import org.apache.pig.tools.pigstats.Pig
public class PigInputFormatSpark extends PigInputFormat {
- @Override
- public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException,
- InterruptedException {
- initLogger();
- resetUDFContext();
- //PigSplit#conf is the default hadoop configuration, we need get the configuration
- //from context.getConfigration() to retrieve pig properties
- PigSplit pigSplit = (PigSplit) split;
- Configuration conf = context.getConfiguration();
- pigSplit.setConf(conf);
- //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
- //which will be used in POMergeCogroup#setup
- if (PigMapReduce.sJobContext == null) {
- PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
- }
- PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
- // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
- // SchemaTupleBackend by reading properties from JobConf
- initialize(conf);
- return super.createRecordReader(split, context);
+@Override
+public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ initLogger();
+ resetUDFContext();
+ //PigSplit#conf is the default hadoop configuration, we need get the configuration
+ //from context.getConfigration() to retrieve pig properties
+ PigSplit pigSplit = (PigSplit) split;
+ Configuration conf = context.getConfiguration();
+ pigSplit.setConf(conf);
+ //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+ //which will be used in POMergeCogroup#setup
+ if (PigMapReduce.sJobContext == null) {
+ PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
}
+ PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
+ // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
+ // SchemaTupleBackend by reading properties from JobConf
+ initialize(conf);
+ return super.createRecordReader(split, context);
+}
private void initialize(Configuration jobConf) throws IOException {
MapRedUtil.setupUDFContext(jobConf);
@@ -73,12 +73,12 @@ public class PigInputFormatSpark extends
}
private void resetUDFContext() {
- UDFContext.getUDFContext().reset();
- }
+ UDFContext.getUDFContext().reset();
+ }
- private void initLogger() {
- PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
- PhysicalOperator.setPigLogger(pigHadoopLogger);
- }
-}
\ No newline at end of file
+ private void initLogger() {
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Wed Apr 12 02:20:20 2017
@@ -291,7 +291,7 @@ public class TezPlanContainer extends Op
Set<TezOperator> splitters2 = new HashSet<>();
Set<TezOperator> processedPredecessors = new HashSet<>();
// Find predecessors which are splitters
- fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+ fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1, false);
if (!splitters1.isEmpty()) {
// For the successor, traverse rest of the plan below it and
// search the predecessors of its successors to find any predecessor that might be a splitter.
@@ -300,7 +300,7 @@ public class TezPlanContainer extends Op
processedPredecessors.clear();
processedPredecessors.add(successor);
for (TezOperator succ : allSuccs) {
- fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+ fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2, true);
}
// Find the common ones
splitters1.retainAll(splitters2);
@@ -309,7 +309,7 @@ public class TezPlanContainer extends Op
}
private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
- Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+ Set<TezOperator> processedPredecessors, Set<TezOperator> splitters, boolean stopAtSplit) {
List<TezOperator> predecessors = plan.getPredecessors(tezOp);
if (predecessors != null) {
for (TezOperator pred : predecessors) {
@@ -319,9 +319,13 @@ public class TezPlanContainer extends Op
}
if (pred.isSplitter()) {
splitters.add(pred);
+ if (!stopAtSplit) {
+ processedPredecessors.add(pred);
+ fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
+ }
} else if (!pred.needSegmentBelow()) {
processedPredecessors.add(pred);
- fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+ fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java Wed Apr 12 02:20:20 2017
@@ -37,6 +37,6 @@ public class AccumulatorOptimizer extend
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
- AccumulatorOptimizerUtil.addAccumulator(tezOp.plan);
+ AccumulatorOptimizerUtil.addAccumulator(tezOp.plan, tezOp.plan.getRoots());
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Wed Apr 12 02:20:20 2017
@@ -102,7 +102,8 @@ public class SecondaryKeyOptimizerTez ex
rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
}
- SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan);
+ SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new SecondaryKeyOptimizerUtil();
+ SecondaryKeyOptimizerInfo info = secondaryKeyOptUtil.applySecondaryKeySort(rearrangePlan, to.plan);
if (info != null) {
numSortRemoved += info.getNumSortRemoved();
numDistinctChanged += info.getNumDistinctChanged();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Apr 12 02:20:20 2017
@@ -61,9 +61,8 @@ public class AccumulatorOptimizerUtil {
return batchSize;
}
- public static void addAccumulator(PhysicalPlan plan) {
+ public static void addAccumulator(PhysicalPlan plan, List<PhysicalOperator> pos) {
// See if this is a map-reduce job
- List<PhysicalOperator> pos = plan.getRoots();
if (pos == null || pos.size() == 0) {
return;
}
@@ -290,91 +289,4 @@ public class AccumulatorOptimizerUtil {
return false;
}
-
- public static void addAccumulatorSpark(PhysicalPlan plan) throws
- VisitorException {
- List<PhysicalOperator> pos = plan.getRoots();
- if (pos == null || pos.size() == 0) {
- return;
- }
-
- List<POGlobalRearrangeSpark> gras = PlanHelper.getPhysicalOperators(plan,
- POGlobalRearrangeSpark.class);
-
- for (POGlobalRearrange gra : gras) {
- addAccumulatorSparkForGRASubDAG(plan, gra);
- }
- }
-
-
- private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan,
- POGlobalRearrange gra) throws VisitorException {
-
- List<PhysicalOperator> poPackages = plan.getSuccessors(gra);
-
- if (poPackages == null || poPackages.size() == 0) {
- return;
- }
- // See if this is a POPackage
- PhysicalOperator po_package = poPackages.get(0);
- if (!po_package.getClass().equals(POPackage.class)) {
- return;
- }
-
- Packager pkgr = ((POPackage) po_package).getPkgr();
- // Check that this is a standard package, not a subclass
- if (!pkgr.getClass().equals(Packager.class)) {
- return;
- }
-
- // if POPackage is for distinct, just return
- if (pkgr.isDistinct()) {
- return;
- }
-
- // if any input to POPackage is inner, just return
- boolean[] isInner = pkgr.getInner();
- for (boolean b : isInner) {
- if (b) {
- return;
- }
- }
-
- List<PhysicalOperator> l = plan.getSuccessors(po_package);
- // there should be only one POForEach
- if (l == null || l.size() == 0 || l.size() > 1) {
- return;
- }
-
- PhysicalOperator po_foreach = l.get(0);
- if (!(po_foreach instanceof POForEach)) {
- return;
- }
-
- boolean foundUDF = false;
- List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans();
- for (PhysicalPlan p : list) {
- PhysicalOperator po = p.getLeaves().get(0);
-
- // only expression operators are allowed
- if (!(po instanceof ExpressionOperator)) {
- return;
- }
-
- if (((ExpressionOperator) po).containUDF()) {
- foundUDF = true;
- }
-
- if (!check(po)) {
- return;
- }
- }
-
- if (foundUDF) {
- // if all tests are passed, reducer can run in accumulative mode
- LOG.info("Reducer is to run in accumulative mode.");
- po_package.setAccumulative();
- po_foreach.setAccumulative();
- }
- }
}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Wed Apr 12 02:20:20 2017
@@ -53,12 +53,12 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-@InterfaceAudience.Private
+@InterfaceAudience.Public
public class SecondaryKeyOptimizerUtil {
private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
private static boolean isSparkMode;
- private SecondaryKeyOptimizerUtil() {
+ public SecondaryKeyOptimizerUtil() {
}
@@ -186,7 +186,7 @@ public class SecondaryKeyOptimizerUtil {
return result;
}
- public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+ public SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
@@ -245,29 +245,7 @@ public class SecondaryKeyOptimizerUtil {
}
PhysicalOperator root = reduceRoots.get(0);
- PhysicalOperator currentNode = null;
- if (!isSparkMode) {
- if (!(root instanceof POPackage)) {
- log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
- return null;
- } else {
- currentNode = root;
- }
- } else {
- if (!(root instanceof POGlobalRearrange)) {
- log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing");
- return null;
- } else {
- List<PhysicalOperator> globalRearrangeSuccs = reducePlan
- .getSuccessors(root);
- if (globalRearrangeSuccs.size() == 1) {
- currentNode = globalRearrangeSuccs.get(0);
- } else {
- log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing");
- return null;
- }
- }
- }
+ PhysicalOperator currentNode = getCurrentNode(root,reducePlan);
// visit the POForEach of the reduce plan. We can have Limit and Filter
// in the middle
@@ -442,6 +420,16 @@ public class SecondaryKeyOptimizerUtil {
return secKeyOptimizerInfo;
}
+ protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) {
+ PhysicalOperator currentNode = null;
+ if (!(root instanceof POPackage)) {
+ log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+ } else {
+ currentNode = root;
+ }
+ return currentNode;
+ }
+
private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
SortKeyInfo secondarySortKeyInfo) throws VisitorException {
// Put plan to project secondary key to the POLocalRearrange
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Apr 12 02:20:20 2017
@@ -308,7 +308,7 @@ public class HBaseStorage extends LoadFu
//so we need check whether UDFContext.getUDFContext().getClientSystemProps()
//is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is
//UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER)
- //Detail see PIG-4611
+ //Detail see PIG-4920
String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER;
String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Wed Apr 12 02:20:20 2017
@@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
+import java.io.Serializable;
+
/**
* Class to hold code common to self spilling bags such as InternalCachedBag
*/
@@ -30,8 +32,7 @@ import org.apache.pig.classification.Int
public abstract class SelfSpillBag extends DefaultAbstractBag {
private static final long serialVersionUID = 1L;
// SelfSpillBag$MemoryLimits is not serializable
- //in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611)
- protected transient MemoryLimits memLimit;
+ protected MemoryLimits memLimit;
public SelfSpillBag(int bagCount) {
memLimit = new MemoryLimits(bagCount, -1);
@@ -49,10 +50,11 @@ public abstract class SelfSpillBag exten
* The number of objects that will fit into this memory limit is computed
* using the average memory size of the objects whose size is given to this
* class.
+ * In spark mode, MemoryLimits needs implement Serializable interface otherwise NotSerializableExecption will be thrown (See PIG-4611)
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
- public static class MemoryLimits {
+ public static class MemoryLimits implements Serializable {
private long maxMemUsage;
private long cacheLimit = Integer.MAX_VALUE;
Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Wed Apr 12 02:20:20 2017
@@ -23,11 +23,14 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
public class UDFContext {
+ private static final Log LOG = LogFactory.getLog(UDFContext.class);
private Configuration jconf = null;
private HashMap<UDFContextKey, Properties> udfConfs;
private Properties clientSysProps;
@@ -204,6 +207,17 @@ public class UDFContext {
conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
}
+ /*
+ called by SparkEngineConf#writeObject
+ */
+ public String serialize() {
+ try {
+ return ObjectSerializer.serialize(udfConfs);
+ } catch (IOException e) {
+ LOG.error("UDFContext#serialize throws error ",e);
+ return null;
+ }
+ }
/**
* Populate the udfConfs field. This function is intended to
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOStore.java Wed Apr 12 02:20:20 2017
@@ -36,6 +36,7 @@ public class LOStore extends LogicalRela
private boolean isTmpStore;
private SortInfo sortInfo;
private final StoreFuncInterface storeFunc;
+ private boolean disambiguationEnabled = true;
public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) {
super("LOStore", plan);
@@ -43,6 +44,12 @@ public class LOStore extends LogicalRela
this.storeFunc = storeFunc;
this.signature = signature;
}
+
+ public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature,
+ boolean disambiguationEnabled) {
+ this(plan, outputFileSpec, storeFunc, signature);
+ this.disambiguationEnabled = disambiguationEnabled;
+ }
public FileSpec getOutputSpec() {
return output;
@@ -55,6 +62,17 @@ public class LOStore extends LogicalRela
@Override
public LogicalSchema getSchema() throws FrontendException {
schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+
+ if (!disambiguationEnabled && schema != null && schema.getFields() != null) {
+ //If requested try and remove parent alias substring including colon(s)
+ for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) {
+ if (field.alias == null || !field.alias.contains(":")) {
+ continue;
+ }
+ field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1);
+ }
+ }
+
return schema;
}
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Wed Apr 12 02:20:20 2017
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
String sig = LogicalPlanBuilder.newOperatorKey(scope);
stoFunc.setStoreFuncUDFContextSignature(sig);
- store = new LOStore(lp, fileSpec, stoFunc, sig);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
+
+ store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
store.setTmpStore(true);
lp.add( store );
lp.connect( refOp, store );
Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Apr 12 02:20:20 2017
@@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder {
fileNameMap.put(fileNameKey, absolutePath);
}
FileSpec fileSpec = new FileSpec(absolutePath, funcSpec);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
- LOStore op = new LOStore(plan, fileSpec, stoFunc, signature);
+ LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled);
return buildOp(loc, op, alias, inputAlias, null);
} catch(Exception ex) {
throw new ParserValidationException(intStream, loc, ex);
Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 12 02:20:20 2017
@@ -70,8 +70,10 @@ public class QueryParserUtils {
fileName = removeQuotes( fileName );
FileSpec fileSpec = new FileSpec( fileName, funcSpec );
String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
+ boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties().
+ getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
stoFunc.setStoreFuncUDFContextSignature(sig);
- LOStore store = new LOStore(lp, fileSpec, stoFunc, sig);
+ LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled);
store.setAlias(alias);
try {
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Apr 12 02:20:20 2017
@@ -23,6 +23,7 @@ import java.io.SequenceInputStream;
import java.util.Enumeration;
import jline.console.ConsoleReader;
+import jline.console.history.FileHistory;
/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However,
* we cannot use ConsoleReaderInputStream directly since:
@@ -104,6 +105,9 @@ public class ConsoleReaderInputStream ex
if (buffer == null) {
buffer = reader.readLine().getBytes();
+
+ //Write current grunt buffer to pig history file
+ ((FileHistory)reader.getHistory()).flush();
}
if (buffer == null) {
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Apr 12 02:20:20 2017
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
+import org.apache.pig.tools.pigstats.*;
import scala.Option;
import org.apache.hadoop.conf.Configuration;
@@ -31,11 +32,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.tools.pigstats.InputStats;
-import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
-import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
@@ -72,7 +68,7 @@ public class SparkJobStats extends JobSt
long bytes = getOutputSize(poStore, conf);
long recordsCount = -1;
if (disableCounter == false) {
- recordsCount = SparkStatsUtil.getStoreSparkCounterValue(poStore);
+ recordsCount = SparkStatsUtil.getRecordCount(poStore);
}
OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
bytes, recordsCount, success);
@@ -88,7 +84,7 @@ public class SparkJobStats extends JobSt
long recordsCount = -1;
if (disableCounter == false) {
- recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+ recordsCount = SparkStatsUtil.getRecordCount(po);
}
long bytesRead = -1;
if (singleInput && stats.get("BytesRead") != null) {
@@ -190,13 +186,13 @@ public class SparkJobStats extends JobSt
if (inputMetricExist) {
results.put("BytesRead", bytesRead);
hdfsBytesRead = bytesRead;
- counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
}
if (outputMetricExist) {
results.put("BytesWritten", bytesWritten);
hdfsBytesWritten = bytesWritten;
- counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
}
if (shuffleReadMetricExist) {
@@ -331,7 +327,7 @@ public class SparkJobStats extends JobSt
private void initializeHadoopCounter() {
counters = new Counters();
Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP);
- fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_READ, MRPigStatsUtil.HDFS_BYTES_READ, 0);
- fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_WRITTEN, MRPigStatsUtil.HDFS_BYTES_WRITTEN, 0);
+ fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0);
+ fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0);
}
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Apr 12 02:20:20 2017
@@ -93,9 +93,7 @@ public class SparkPigStats extends PigSt
addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
- if (e != null) {
- jobStats.setBackendException(e);
- }
+ jobStats.setBackendException(e);
}
public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) {
@@ -103,9 +101,7 @@ public class SparkPigStats extends PigSt
jobStats.setSuccessful(isSuccess);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
- if (e != null) {
- jobStats.setBackendException(e);
- }
+ jobStats.setBackendException(e);
}
public void finish() {
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Apr 12 02:20:20 2017
@@ -37,10 +37,10 @@ import org.apache.spark.api.java.JavaSpa
public class SparkStatsUtil {
- public static final String SPARK_STORE_COUNTER_GROUP = "Spark Store Counters";
- public static final String SPARK_STORE_RECORD_COUNTER = "Output records in ";
- public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input Counters";
- public static final String SPARK_INPUT_RECORD_COUNTER = "Input records from ";
+ public static final String SPARK_STORE_COUNTER_GROUP = PigStatsUtil.MULTI_STORE_COUNTER_GROUP;
+ public static final String SPARK_STORE_RECORD_COUNTER = PigStatsUtil.MULTI_STORE_RECORD_COUNTER;
+ public static final String SPARK_INPUT_COUNTER_GROUP = PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP;
+ public static final String SPARK_INPUT_RECORD_COUNTER = PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER;
public static void waitForJobAddStats(int jobID,
POStore poStore, SparkOperator sparkOperator,
@@ -71,7 +71,7 @@ public class SparkStatsUtil {
sparkContext, e);
}
- public static String getStoreSparkCounterName(POStore store) {
+ public static String getCounterName(POStore store) {
String shortName = PigStatsUtil.getShortName(store.getSFile().getFileName());
StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER);
@@ -84,7 +84,7 @@ public class SparkStatsUtil {
return sb.toString();
}
- public static String getLoadSparkCounterName(POLoad load) {
+ public static String getCounterName(POLoad load) {
String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName());
StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER);
@@ -95,15 +95,15 @@ public class SparkStatsUtil {
return sb.toString();
}
- public static long getStoreSparkCounterValue(POStore store) {
+ public static long getRecordCount(POStore store) {
SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
- return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store));
+ return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store));
}
- public static long getLoadSparkCounterValue(POLoad load) {
+ public static long getRecordCount(POLoad load) {
SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
- return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount;
+ return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load))/loadersCount;
}
private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Apr 12 02:20:20 2017
@@ -1003,7 +1003,7 @@ public class TestGrunt {
} else {
//In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD,
//So unwrap the exception here
- assertTrue(((ExecException) e.getCause().getCause()).getErrorCode() == 6017);
+ assertTrue(((ExecException) e.getCause()).getErrorCode() == 6017);
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java Wed Apr 12 02:20:20 2017
@@ -651,7 +651,7 @@ public class TestProjectRange {
};
Schema s = pigServer.dumpSchema("f");
Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.isSparkExecType(cluster.getExecType()));
}
/**
@@ -738,8 +738,8 @@ public class TestProjectRange {
"(1,{(11,21,31,41,51),(10,20,30,40,50)})",
};
Schema s = pigServer.dumpSchema("f");
- Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@@ -928,8 +928,8 @@ public class TestProjectRange {
" g = group l1 by .. c, l2 by .. c;"
;
String expectedSchStr = "grp: (a: int,b: long,c: int)," +
- "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
- "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
+ "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
+ "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
@@ -948,8 +948,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
- Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
/**
@@ -1017,7 +1017,7 @@ public class TestProjectRange {
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
@@ -1068,8 +1068,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("lim");
Schema s = pigServer.dumpSchema("lim");
- Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@@ -1132,7 +1132,7 @@ public class TestProjectRange {
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.isSparkExecType(cluster.getExecType()));
}
private void setAliasesToNull(Schema schema) {
@@ -1171,8 +1171,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
- Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
@@ -1200,8 +1200,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
- Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
- Util.isSparkExecType(cluster.getExecType()));
+ Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
@@ -1213,7 +1213,7 @@ public class TestProjectRange {
" l2 = load '" + INP_FILE_5FIELDS + "';" +
" g = cogroup l1 by ($0 .. ), l2 by ($0 .. );";
Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " +
- "(range of columns to the end) " +
+ "(range of columns to the end) " +
"is only allowed if the input has a schema");
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Wed Apr 12 02:20:20 2017
@@ -611,7 +611,7 @@ public class TestPruneColumn {
"({(5)},{})"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
- Util.isSparkExecType(Util.getLocalTestMode()));
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -963,7 +963,7 @@ public class TestPruneColumn {
"((2,5,2))"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
- Util.isSparkExecType(Util.getLocalTestMode()));
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -1513,7 +1513,7 @@ public class TestPruneColumn {
"({(3),(3),(3)},3)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
- Util.isSparkExecType(Util.getLocalTestMode()));
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
@@ -1535,7 +1535,7 @@ public class TestPruneColumn {
"(5,{(2,5,2)})"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
- ,Util.isSparkExecType(Util.getLocalTestMode()));
+ , Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
@@ -1865,7 +1865,7 @@ public class TestPruneColumn {
"(2,5,2,2)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
- Util.isSparkExecType(Util.getLocalTestMode()));
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
Modified: pig/branches/spark/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSchema.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSchema.java Wed Apr 12 02:20:20 2017
@@ -29,7 +29,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
@@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
import org.apache.pig.parser.ParserException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSchema {
+ private static MiniGenericCluster cluster;
+ private static PigServer pigServer;
+
+ @BeforeClass
+ public static void setupTestCluster() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ }
+
+ @AfterClass
+ public static void tearDownTestCluster() throws Exception {
+ cluster.shutDown();
+ }
+
@Test
public void testSchemaEqual1() {
@@ -660,8 +680,6 @@ public class TestSchema {
@Test
public void testSchemaSerialization() throws IOException {
- MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
String inputFileName = "testSchemaSerialization-input.txt";
String[] inputData = new String[] { "foo\t1", "hello\t2" };
Util.createInputFile(cluster, inputFileName, inputData);
@@ -673,7 +691,6 @@ public class TestSchema {
Tuple t = it.next();
assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
}
- cluster.shutDown();
}
@Test
@@ -938,4 +955,79 @@ public class TestSchema {
assertTrue(schemaString.equals(s2));
}
}
+
+ @Test
+ public void testDisabledDisambiguationContainsNoColons() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ String inputFileName = "testPrepend-input.txt";
+ String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" +
+ "C = GROUP A BY (fruit,color);" +
+ "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" +
+ "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" +
+ "E = JOIN B BY id, D BY group::fruit;" +
+ "F = UNION ONSCHEMA B, D2;" +
+ "G = CROSS B, D2;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+
+ //Prepending should happen with default settings
+ assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString());
+
+ //Override prepend property setting (check for flatten, join)
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString());
+ assertTrue(pigServer.openIterator("E").hasNext());
+
+ //Check for union and cross
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString());
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString());
+
+ }
+
+ @Test
+ public void testEnabledDisambiguationPassesForDupeAliases() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ checkForDupeAliases();
+
+ //Should pass with default settings
+ assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString());
+ assertTrue(pigServer.openIterator("C").hasNext());
+ }
+
+ @Test
+ public void testDisabledDisambiguationFailsForDupeAliases() throws IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ try {
+ checkForDupeAliases();
+ //Should fail with prepending disabled
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false");
+ pigServer.dumpSchema("C");
+ } catch (FrontendException e){
+ Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage());
+ }
+ }
+
+ private static void checkForDupeAliases() throws IOException {
+ String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt";
+ String[] inputData = new String[]{"foo\t1", "bar\t2"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" +
+ "C = JOIN A by id, B by id;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+ }
+
+ private static void resetDisambiguationTestPropertyOverride() {
+ //Reset possible overrides
+ pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE);
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Wed Apr 12 02:20:20 2017
@@ -270,7 +270,7 @@ public abstract class TestSecondarySort
"(1,{(1,2,3),(1,2,4),(1,3,4)})"
};
Util.checkQueryOutputs(iter, expected, org.apache
- .pig.newplan.logical.Util.translateSchema(s),
+ .pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(Util.getLocalTestMode()));
Util.deleteFile(cluster, clusterPath);
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1791060&r1=1791059&r2=1791060&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Apr 12 02:20:20 2017
@@ -203,6 +203,25 @@ public class TestTezCompiler {
resetScope();
resetFileLocalizer();
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
+
+ // Three levels of splits - a, a1 and a2.
+ // One split above and one split below a1 which is the split to be replaced with tmp store.
+ query =
+ "a = load 'file:///tmp/input';" +
+ "store a into 'file:///tmp/pigoutput/Dir0';" +
+ "a1 = filter a by $0 == 5;" +
+ "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+ "a2 = distinct a1;" +
+ "store a2 into 'file:///tmp/pigoutput/Dir2';" +
+ "a3 = group a2 by $0;" +
+ "store a3 into 'file:///tmp/pigoutput/Dir3';" +
+ "b = load 'file:///tmp/pigoutput/Dir3';" +
+ "c = join a1 by $0, b by $0;" +
+ "store c into 'file:///tmp/pigoutput/Dir4';";
+
+ resetScope();
+ resetFileLocalizer();
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld");
}
@Test