You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [13/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Feb 22 09:43:41 2017
@@ -51,10 +51,9 @@ public class SpillableMemoryManager impl
private static final Log log = LogFactory.getLog(SpillableMemoryManager.class);
- private static final int ONE_GB = 1024 * 1024 * 1024;
private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 1024;
- private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
- private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+ private static final float MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7f;
+ private static final float COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7f;
private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
// References to spillables with size
@@ -86,7 +85,7 @@ public class SpillableMemoryManager impl
// fraction of the total heap used for the threshold to determine
// if we want to perform an extra gc before the spill
- private double extraGCThresholdFraction = 0.05;
+ private float extraGCThresholdFraction = 0.05f;
private long extraGCSpillSizeThreshold = 0L;
private volatile boolean blockRegisterOnSpill = false;
@@ -142,7 +141,7 @@ public class SpillableMemoryManager impl
* @param unusedMemoryThreshold
* Unused memory size below which we want to get notifications
*/
- private void configureMemoryThresholds(double memoryThresholdFraction, double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+ private void configureMemoryThresholds(float memoryThresholdFraction, float collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
long tenuredHeapSize = tenuredHeap.getUsage().getMax();
memoryThresholdSize = (long)(tenuredHeapSize * memoryThresholdFraction);
collectionThresholdSize = (long)(tenuredHeapSize * collectionMemoryThresholdFraction);
@@ -184,8 +183,8 @@ public class SpillableMemoryManager impl
spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", spillFileSizeThreshold);
gcActivationSize = conf.getLong("pig.spill.gc.activation.size", gcActivationSize);
- double memoryThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
- double collectionThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+ float memoryThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
+ float collectionThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
long unusedMemoryThreshold = conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, UNUSED_MEMORY_THRESHOLD_DEFAULT);
configureMemoryThresholds(memoryThresholdFraction, collectionThresholdFraction, unusedMemoryThreshold);
}
@@ -199,7 +198,7 @@ public class SpillableMemoryManager impl
// used - heapmax/2 + heapmax/4
long toFree = 0L;
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
- toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
+ toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
//log
String msg = "memory handler call- Usage threshold "
@@ -211,7 +210,7 @@ public class SpillableMemoryManager impl
log.debug(msg);
}
} else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
- toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
+ toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
//log
String msg = "memory handler call - Collection threshold "
Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Wed Feb 22 09:43:41 2017
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
@@ -93,18 +94,10 @@ public class Utils {
return System.getProperty("java.vendor").contains("IBM");
}
- public static boolean isHadoop23() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b0\\.23\\..+\\b"))
- return true;
- return false;
- }
-
- public static boolean isHadoop2() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b2\\.\\d+\\..+"))
- return true;
- return false;
+ public static boolean is64bitJVM() {
+ String arch = System.getProperties().getProperty("sun.arch.data.model",
+ System.getProperty("com.ibm.vm.bitmode"));
+ return arch != null && arch.equals("64");
}
/**
@@ -574,6 +567,11 @@ public class Utils {
return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
}
+ public static boolean isLocal(Configuration conf) {
+ return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)
+ || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+ }
+
// PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
// Following code has been borrowed from Hadoop's Configuration#substituteVars
private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
@@ -697,4 +695,15 @@ public class Utils {
DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
}
}
+
+ /**
+ * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+ *
+ * @param hook code to execute during shutdown
+ * @param priority Priority over the FileSystem.SHUTDOWN_HOOK_PRIORITY
+ */
+ public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+ ShutdownHookManager.get().addShutdownHook(hook,
+ FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Feb 22 09:43:41 2017
@@ -118,6 +118,8 @@ public class AvroStorageDataConversionUt
return ByteBuffer.wrap(((DataByteArray) o).get());
case FIXED:
return new GenericData.Fixed(s, ((DataByteArray) o).get());
+ case ENUM:
+ return new GenericData.EnumSymbol(s,o.toString());
default:
if (DataType.findType(o) == DataType.DATETIME) {
return ((DateTime) o).getMillis();
Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed Feb 22 09:43:41 2017
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import java.io.DataInput;
import java.io.DataOutput;
@@ -49,6 +50,7 @@ import java.util.Map;
public final class AvroTupleWrapper <T extends IndexedRecord>
implements Tuple {
private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+ private TupleFactory mTupleFactory = TupleFactory.getInstance();
/**
* The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@ public final class AvroTupleWrapper <T e
}
@Override
- public void write(final DataOutput o) throws IOException {
- throw new IOException(
- this.getClass().toString() + ".write called, but not implemented yet");
+ public void write(DataOutput out) throws IOException {
+ Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+ t.write(out);
}
@SuppressWarnings("rawtypes")
Modified: pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java Wed Feb 22 09:43:41 2017
@@ -98,13 +98,17 @@ public abstract class FilterExtractor {
public void visit() throws FrontendException {
// we will visit the leaf and it will recursively walk the plan
LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
- // if the leaf is a unary operator it should be a FilterFunc in
- // which case we don't try to extract partition filter conditions
- if(leaf instanceof BinaryExpression) {
- // recursively traverse the tree bottom up
- // checkPushdown returns KeyState which is pair of LogicalExpression
- BinaryExpression binExpr = (BinaryExpression)leaf;
- KeyState finale = checkPushDown(binExpr);
+
+ // recursively traverse the tree bottom up
+ // checkPushdown returns KeyState which is pair of LogicalExpression
+ KeyState finale = null;
+ if (leaf instanceof BinaryExpression) {
+ finale = checkPushDown((BinaryExpression) leaf);
+ } else if (leaf instanceof UnaryExpression) {
+ finale = checkPushDown((UnaryExpression) leaf);
+ }
+
+ if (finale != null) {
this.filterExpr = finale.filterExpr;
this.pushdownExpr = getExpression(finale.pushdownExpr);
}
@@ -278,12 +282,22 @@ public abstract class FilterExtractor {
if (unaryExpr instanceof CastExpression) {
return checkPushDown(unaryExpr.getExpression());
}
- if (unaryExpr instanceof IsNullExpression) {
- state.pushdownExpr = unaryExpr;
- state.filterExpr = null;
- } else if (unaryExpr instanceof NotExpression) {
- state.pushdownExpr = unaryExpr;
- state.filterExpr = null;
+ // For IsNull, the child may not be a supported expression, e.g. MapLookupExpression.
+ // For NotExpression, the child, C, is broken into expressions P and F such that C = P AND F
+ // Consequently, NOT C = NOT P OR NOT F, which can't be expressed as an AND so both must be
+ // pushed or both used as a filter.
+ // For both cases, this expr can be pushed if and only if the entire child can be.
+ if (unaryExpr instanceof IsNullExpression || unaryExpr instanceof NotExpression) {
+ KeyState childState = checkPushDown(unaryExpr.getExpression());
+ if (childState.filterExpr == null) {
+ // only push down if the entire expression can be pushed
+ state.pushdownExpr = unaryExpr;
+ state.filterExpr = null;
+ } else {
+ removeFromFilteredPlan(childState.filterExpr);
+ state.filterExpr = addToFilterPlan(unaryExpr);
+ state.pushdownExpr = null;
+ }
} else {
state.filterExpr = addToFilterPlan(unaryExpr);
state.pushdownExpr = null;
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017
@@ -323,6 +323,7 @@ public class ExpToPhyTranslationVisitor
public void visit( CastExpression op ) throws FrontendException {
POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
+ pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
// physOp.setAlias(op.getAlias());
currentPlan.add(pCast);
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,8 @@ public class MapLookupExpression extends
LogicalFieldSchema predFS = successor.getFieldSchema();
if (predFS!=null) {
if (predFS.type==DataType.MAP && predFS.schema!=null) {
- return (predFS.schema.getField(0));
+ fieldSchema = predFS.schema.getField(0);
+ return fieldSchema;
}
else {
fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Wed Feb 22 09:43:41 2017
@@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR
// to store uid in mUserDefinedSchema
private List<LogicalSchema> mUserDefinedSchema = null;
private List<LogicalSchema> outputPlanSchemas = null;
+ private List<LogicalSchema> expSchemas = null;
// If LOGenerate generate new uid, cache it here.
// This happens when expression plan does not have complete schema, however,
// user give complete schema in ForEach statement in script
@@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR
schema = new LogicalSchema();
outputPlanSchemas = new ArrayList<LogicalSchema>();
+ expSchemas = new ArrayList<LogicalSchema>();
for(int i=0; i<outputPlans.size(); i++) {
LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -93,19 +95,17 @@ public class LOGenerate extends LogicalR
fieldSchema = exp.getFieldSchema().deepCopy();
expSchema = new LogicalSchema();
- if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
+ if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
// if type is primitive, just add to schema
- if (fieldSchema!=null)
+ if (fieldSchema != null)
expSchema.addField(fieldSchema);
- else
- expSchema = null;
} else {
- // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+ // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
if (fieldSchema.schema==null) {
expSchema = null;
}
else {
- // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
+ // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
if (flattenFlags[i]) {
if (fieldSchema.type == DataType.BAG) {
@@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
+ } else if (fieldSchema.type == DataType.MAP) {
+ //should only contain 1 schemafield for Map's value
+ innerFieldSchemas = fieldSchema.schema.getFields();
+ LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
+ fsForValue.alias = fieldSchema.alias + "::value";
+
+ LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
+ fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
+
+ expSchema.addField(fsForKey);
} else { // DataType.TUPLE
innerFieldSchemas = fieldSchema.schema.getFields();
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
}
}
-
+
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
expSchema.addField(fs);
}
@@ -137,6 +147,7 @@ public class LOGenerate extends LogicalR
if (expSchema!=null && expSchema.size()==0)
expSchema = null;
LogicalSchema planSchema = new LogicalSchema();
+ expSchemas.add(expSchema);
if (mUserDefinedSchemaCopy!=null) {
LogicalSchema mergedSchema = new LogicalSchema();
// merge with userDefinedSchema
@@ -146,12 +157,6 @@ public class LOGenerate extends LogicalR
fs.stampFieldSchema();
mergedSchema.addField(new LogicalFieldSchema(fs));
}
- for (LogicalFieldSchema fs : mergedSchema.getFields()) {
- if (fs.type == DataType.NULL){
- //this is the use case where a new alias has been specified by user
- fs.type = DataType.BYTEARRAY;
- }
- }
} else {
// Merge uid with the exp field schema
@@ -163,8 +168,12 @@ public class LOGenerate extends LogicalR
mergedSchema.mergeUid(expSchema);
}
- for (LogicalFieldSchema fs : mergedSchema.getFields())
+ for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+ if (fs.type==DataType.NULL) {
+ fs.type = DataType.BYTEARRAY;
+ }
planSchema.addField(fs);
+ }
} else {
// if any plan do not have schema, the whole LOGenerate do not have schema
if (expSchema==null) {
@@ -310,4 +319,8 @@ public class LOGenerate extends LogicalR
super.resetSchema();
outputPlanSchemas = null;
}
+
+ public List<LogicalSchema> getExpSchemas() {
+ return expSchemas;
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java Wed Feb 22 09:43:41 2017
@@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat
*/
public static enum JOINTYPE {
HASH, // Hash Join
+ BLOOM, // Bloom Join
REPLICATED, // Fragment Replicated join
SKEWED, // Skewed Join
MERGE, // Sort Merge Join
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor
return;
}
- else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
@@ -1425,7 +1425,20 @@ public class LogToPhyTranslationVisitor
e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
- poPackage.getPkgr().setPackageType(PackageType.JOIN);
+ if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
+ if (innerFlags.length == 2) {
+ if (innerFlags[0] == false && innerFlags[1] == false) {
+ throw new LogicalToPhysicalTranslatorException(
+ "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
+ ". Bloom join cannot be used with a FULL OUTER join.",
+ 1109,
+ PigException.INPUT);
+ }
+ }
+ poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
+ } else {
+ poPackage.getPkgr().setPackageType(PackageType.JOIN);
+ }
}
translateSoftLinks(loj);
}
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Feb 22 09:43:41 2017
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi
import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
+import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe
new ColumnAliasConversionVisitor(this).visit();
new SchemaAliasVisitor(this).visit();
new ScalarVisitor(this, pigContext, scope).visit();
+ new ForEachUserSchemaVisitor(this).visit();
// ImplicitSplitInsertVisitor has to be called before
// DuplicateForEachColumnRewriteVisitor. Detail at pig-1766
@@ -189,6 +191,15 @@ public class LogicalPlan extends BaseOpe
new TypeCheckingRelVisitor( this, collector).visit();
+
+ new UnionOnSchemaSetter(this).visit();
+ new CastLineageSetter(this, collector).visit();
+ new ScalarVariableValidator(this).visit();
+ new StoreAliasSetter(this).visit();
+
+ // compute whether output data is sorted or not
+ new SortInfoSetter(this).visit();
+
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
if(aggregateWarning) {
@@ -199,14 +210,6 @@ public class LogicalPlan extends BaseOpe
}
}
- new UnionOnSchemaSetter(this).visit();
- new CastLineageSetter(this, collector).visit();
- new ScalarVariableValidator(this).visit();
- new StoreAliasSetter(this).visit();
-
- // compute whether output data is sorted or not
- new SortInfoSetter(this).visit();
-
if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) {
// Validate input/output file
new InputOutputFileValidatorVisitor(this, pigContext).visit();
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Wed Feb 22 09:43:41 2017
@@ -150,6 +150,39 @@ public class LogicalSchema {
}
return true;
}
+
+ // Check if fs1 is equal to fs2 with regard to type
+ public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
+ if (fs1==null && fs2==null) {
+ return true;
+ }
+ if (fs1==null || fs2==null) {
+ return false;
+ }
+ if (fs1.type!=fs2.type) {
+ return false;
+ }
+ if (DataType.isComplex(fs1.type)) {
+ LogicalSchema s1 = fs1.schema;
+ LogicalSchema s2 = fs2.schema;
+ if (s1==null && s2==null) {
+ return true;
+ }
+ if (fs1==null || fs2==null) {
+ return false;
+ }
+ if (s1.size()!=s2.size()) {
+ return false;
+ }
+ for (int i=0;i<s1.size();i++) {
+ if (!typeMatch(s1.getField(i), s2.getField(i))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Adds the uid from FieldSchema argument to this FieldSchema
* If the argument is null, it stamps this FieldSchema with uid
@@ -447,7 +480,23 @@ public class LogicalSchema {
LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
return mergedFS;
}
-
+
+ public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException {
+ if (fs1.type == DataType.BYTEARRAY) {
+ return true;
+ } else if (fs2.type == DataType.BYTEARRAY) {
+ return true;
+ } else if (fs1.type == fs2.type) {
+ if (DataType.isComplex(fs1.type)) {
+ return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema);
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
/***
* Old Pig field schema does not require a tuple schema inside a bag;
* Now it is required to have that; this method is to fill the gap
@@ -770,7 +819,24 @@ public class LogicalSchema {
}
return mergedSchema;
}
-
+
+ public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+ if (s1 == null) {
+ return true;
+ } else if (s2 == null) {
+ return true;
+ } else if (s1.size() != s2.size()) {
+ return false;
+ } else {
+ for (int i=0;i<s1.size();i++) {
+ if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
public String toString(boolean verbose) {
StringBuilder str = new StringBuilder();
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class AddForEach extends WholePla
}
Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
- if (outputUids==null)
+ if (outputUids==null || outputUids.size() == 0 )
return false;
LogicalSchema schema = op.getSchema();
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Wed Feb 22 09:43:41 2017
@@ -107,7 +107,7 @@ public class CastLineageSetter extends A
if(inLoadFunc == null){
String msg = "Cannot resolve load function to use for casting from " +
DataType.findTypeName(inType) + " to " +
- DataType.findTypeName(outType) + ". ";
+ DataType.findTypeName(outType) + " at " + cast.getLocation() ;
msgCollector.collect(msg, MessageType.Warning,
PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
}else {
Added: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor {
+ public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ private static LogicalSchema replaceNullByteArraySchema(
+ LogicalSchema originalSchema,
+ LogicalSchema userSchema) throws FrontendException {
+ if( originalSchema == null && userSchema == null ) {
+ return null;
+ } else if ( originalSchema == null ) {
+ return userSchema.deepCopy();
+ } else if ( userSchema == null ) {
+ return originalSchema.deepCopy();
+ }
+
+ LogicalSchema replacedSchema = new LogicalSchema();
+ for (int i=0;i<originalSchema.size();i++) {
+ LogicalFieldSchema replacedFS = replaceNullByteArrayFieldSchema(originalSchema.getField(i), userSchema.getField(i));
+ replacedSchema.addField(replacedFS);
+ }
+ return replacedSchema;
+ }
+
+ private static LogicalFieldSchema replaceNullByteArrayFieldSchema(
+ LogicalFieldSchema originalFS,
+ LogicalFieldSchema userFS) throws FrontendException {
+ if( originalFS == null && userFS == null ) {
+ return null;
+ } else if ( originalFS == null ) {
+ return userFS.deepCopy();
+ } else if ( userFS == null ) {
+ return originalFS.deepCopy();
+ }
+ if ( originalFS.type==DataType.NULL
+ || originalFS.type==DataType.BYTEARRAY ) {
+ return userFS.deepCopy();
+ } else if ( userFS.type==DataType.NULL
+ || userFS.type==DataType.BYTEARRAY ) {
+ // Use originalFS schema but keep the alias from userFS
+ return new LogicalFieldSchema(userFS.alias, originalFS.schema, originalFS.type);
+ }
+
+ if ( !DataType.isSchemaType(originalFS.type) ) {
+ return userFS.deepCopy();
+ } else {
+ LogicalSchema replacedSchema = replaceNullByteArraySchema(originalFS.schema, userFS.schema);
+ return new LogicalFieldSchema(userFS.alias, replacedSchema, userFS.type);
+ }
+ }
+
+ private static boolean hasOnlyNullOrByteArraySchema (LogicalFieldSchema fs) {
+ if( DataType.isSchemaType(fs.type) ) {
+ if( fs.schema != null ) {
+ for (LogicalFieldSchema sub_fs : fs.schema.getFields() ) {
+ if( !hasOnlyNullOrByteArraySchema(sub_fs) ) {
+ return false;
+ }
+ }
+ }
+ } else if( fs.type != DataType.NULL && fs.type != DataType.BYTEARRAY ) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void visit(LOForEach foreach) throws FrontendException {
+ LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+ List<LogicalSchema> mExpSchemas = generate.getExpSchemas();
+ List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema();
+
+ // Skip if no way to figure out schema (usually both expression schema and
+ // user defined schema are null)
+ if (foreach.getSchema()==null) {
+ return;
+ }
+
+ if (mUserDefinedSchemas==null) {
+ return;
+ }
+
+ boolean hasUserDefinedSchema = false;
+ for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) {
+ if (mUserDefinedSchema!=null) {
+ hasUserDefinedSchema = true;
+ break;
+ }
+ }
+
+ if (!hasUserDefinedSchema) {
+ return;
+ }
+
+ if (mExpSchemas.size()!=mUserDefinedSchemas.size()) {
+ throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() +
+ " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas",
+ 0, generate.getLocation());
+ }
+
+ LogicalPlan innerPlan = new LogicalPlan();
+ LOForEach casterForEach = new LOForEach(plan);
+ casterForEach.setInnerPlan(innerPlan);
+ casterForEach.setAlias(foreach.getAlias());
+
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, null);
+ innerPlan.add(gen);
+
+ int index = 0;
+ boolean needCast = false;
+ for(int i=0;i<mExpSchemas.size();i++) {
+ LogicalSchema mExpSchema = mExpSchemas.get(i);
+ LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i);
+
+ // Use user defined schema to cast, this is the prevailing use case
+ if (mExpSchema==null) {
+ for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+ if (hasOnlyNullOrByteArraySchema(fs)) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ } else {
+ addToExps(casterForEach, innerPlan, gen, exps, index, true, fs);
+ needCast = true;
+ }
+ index++;
+ }
+ continue;
+ }
+
+ // No user defined schema, no need to cast
+ if (mUserDefinedSchema==null) {
+ for (int j=0;j<mExpSchema.size();j++) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ index++;
+ }
+ continue;
+ }
+
+ // Expression has schema, but user also define schema, need cast only
+ // when there is a mismatch
+ if (mExpSchema.size()!=mUserDefinedSchema.size()) {
+ throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() +
+ " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation());
+ }
+
+ LogicalSchema replacedSchema = replaceNullByteArraySchema(mExpSchema,mUserDefinedSchema);
+ for (int j=0;j<mExpSchema.size();j++) {
+ LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j);
+ LogicalFieldSchema mUserDefinedFieldSchema = replacedSchema.getField(j);
+
+ if (hasOnlyNullOrByteArraySchema(mUserDefinedFieldSchema) ||
+ LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) {
+ addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+ } else {
+ addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema);
+ needCast = true;
+ }
+ index++;
+ }
+ }
+
+ gen.setFlattenFlags(new boolean[index]);
+ if (needCast) {
+ // Insert the casterForEach into the plan and patch up the plan.
+ List <Operator> successorOps = plan.getSuccessors(foreach);
+ if (successorOps != null && successorOps.size() > 0){
+ Operator next = plan.getSuccessors(foreach).get(0);
+ plan.insertBetween(foreach, casterForEach, next);
+ }else{
+ plan.add(casterForEach);
+ plan.connect(foreach,casterForEach);
+ }
+
+ // Since the explict cast is now inserted after the original foreach,
+ // throwing away the user defined "types" but keeping the user
+ // defined names from the original foreach.
+ // 'generate' (LOGenerate) still holds the reference to this
+ // mUserDefinedSchemas
+ for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) {
+ resetTypeToNull( mUserDefinedSchema );
+ }
+ }
+ }
+
+ private void resetTypeToNull (LogicalSchema s1) {
+ if( s1 != null ) {
+ for (LogicalFieldSchema fs : s1.getFields()) {
+ if( DataType.isSchemaType(fs.type) ) {
+ resetTypeToNull(fs.schema);
+ } else {
+ fs.type = DataType.NULL;
+ }
+ }
+ }
+ }
+
+ private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen,
+ List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) {
+
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index);
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ ProjectExpression prj = new ProjectExpression(exp, index, 0, gen);
+ exp.add(prj);
+
+ if (needCaster) {
+ CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+ exp.add(cast);
+ }
+ exps.add(exp);
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Wed Feb 22 09:43:41 2017
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -729,6 +730,44 @@ public class LineageFindRelVisitor exten
}
}
+ @Override
+ public void visit(UserFuncExpression op) throws FrontendException {
+
+ if( op.getFieldSchema() == null ) {
+ return;
+ }
+
+ FuncSpec funcSpec = null;
+ Class loader = instantiateCaster(op.getFuncSpec());
+ List<LogicalExpression> arguments = op.getArguments();
+ if ( loader != null ) {
+ // if evalFunc.getLoadCaster() returns, simply use that.
+ funcSpec = op.getFuncSpec();
+ } else if (arguments.size() != 0 ) {
+ FuncSpec baseFuncSpec = null;
+ LogicalFieldSchema fs = arguments.get(0).getFieldSchema();
+ if ( fs != null ) {
+ baseFuncSpec = uid2LoadFuncMap.get(fs.uid);
+ if( baseFuncSpec != null ) {
+ funcSpec = baseFuncSpec;
+ for(int i = 1; i < arguments.size(); i++) {
+ fs = arguments.get(i).getFieldSchema();
+ if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) {
+ funcSpec = null;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if( funcSpec != null ) {
+ addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec);
+ // in case schema is nested, set funcSpec for all
+ setLoadFuncForUids(op.getFieldSchema().schema, funcSpec);
+ }
+ }
+
/**
* if there is a null constant under casts, return it
* @param rel
@@ -770,6 +809,8 @@ public class LineageFindRelVisitor exten
caster = ((LoadFunc)obj).getLoadCaster();
} else if (obj instanceof StreamToPig) {
caster = ((StreamToPig)obj).getLoadCaster();
+ } else if (obj instanceof EvalFunc) {
+ caster = ((EvalFunc)obj).getLoadCaster();
} else {
throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
2270, PigException.BUG );
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Wed Feb 22 09:43:41 2017
@@ -458,6 +458,7 @@ public class TypeCheckingExpVisitor exte
collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
CastExpression cast = new CastExpression(plan, arg, toFs);
+ cast.setLocation(node.getLocation());
try {
// disconnect cast and arg because the connection is already
// added by cast constructor and insertBetween call is going
@@ -490,7 +491,7 @@ public class TypeCheckingExpVisitor exte
byte outType = cast.getType();
if(outType == DataType.BYTEARRAY && inType != outType) {
int errCode = 1051;
- String msg = "Cannot cast to bytearray";
+ String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray";
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
}
@@ -607,7 +608,7 @@ public class TypeCheckingExpVisitor exte
// Matching schemas if we're working with tuples/bags
if (DataType.isSchemaType(lhsType)) {
try {
- if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){
+ if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
int errCode = 1048;
String msg = "Two inputs of BinCond must have compatible schemas."
+ " left hand side: " + binCond.getLhs().getFieldSchema()
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Wed Feb 22 09:43:41 2017
@@ -351,7 +351,8 @@ public class TypeCheckingRelVisitor exte
if (outFieldSchema.type != fs.type) {
castNeededCounter++ ;
- new CastExpression(genPlan, project, outFieldSchema);
+ CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema);
+ castexp.setLocation(toOp.getLocation());
}
generatePlans.add(genPlan) ;
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Wed Feb 22 09:43:41 2017
@@ -21,6 +21,7 @@ package org.apache.pig.newplan.logical.v
import java.util.ArrayList;
import java.util.List;
+import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
@@ -110,9 +111,20 @@ public class UnionOnSchemaSetter extends
} else {
ProjectExpression projExpr =
new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
- if( fs.type != DataType.BYTEARRAY
- && opSchema.getField( pos ).type != fs.type ) {
- new CastExpression( exprPlan, projExpr, fs );
+ if( opSchema.getField( pos ).type != fs.type ) {
+ if( fs.type != DataType.BYTEARRAY ) {
+ CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs );
+ castexpr.setLocation(union.getLocation());
+ } else {
+ int errCode = 1056;
+ String msg = "Union of incompatible types not allowed. "
+ + "Cannot cast from "
+ + DataType.findTypeName(opSchema.getField( pos ).type)
+ + " to bytearray for '"
+ + opSchema.getField( pos ).alias
+ + "'. Please typecast to compatible types before union." ;
+ throw new FrontendException(union, msg, errCode, PigException.INPUT) ;
+ }
}
genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
}
Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Feb 22 09:43:41 2017
@@ -34,6 +34,7 @@ import org.antlr.runtime.RecognitionExce
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
+import org.apache.pig.NonFSLoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
@@ -888,7 +889,7 @@ public class LogicalPlanBuilder {
if (absolutePath == null) {
absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) );
- if (absolutePath!=null) {
+ if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) {
QueryParserUtils.setHdfsServers( absolutePath, pigContext );
}
fileNameMap.put( fileNameKey, absolutePath );
@@ -1357,13 +1358,19 @@ public class LogicalPlanBuilder {
return Long.parseLong( num );
}
+ /**
+ * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object
+ */
static BigInteger parseBigInteger(String s) {
- String num = s.substring( 0, s.length() - 1 );
+ String num = s.substring( 0, s.length() - 2 );
return new BigInteger( num );
}
+ /**
+ * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object
+ */
static BigDecimal parseBigDecimal(String s) {
- String num = s.substring( 0, s.length() - 1 );
+ String num = s.substring( 0, s.length() - 2 );
return new BigDecimal( num );
}
@@ -1781,6 +1788,8 @@ public class LogicalPlanBuilder {
return JOINTYPE.REPLICATED;
} else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
return LOJoin.JOINTYPE.HASH;
+ } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
+ return LOJoin.JOINTYPE.BLOOM;
} else if( modifier.equalsIgnoreCase( "skewed" ) ) {
return JOINTYPE.SKEWED;
} else if (modifier.equalsIgnoreCase("merge")) {
@@ -1789,7 +1798,7 @@ public class LogicalPlanBuilder {
return JOINTYPE.MERGESPARSE;
} else {
throw new ParserValidationException( intStream, loc,
- "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+ "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
}
}
Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Wed Feb 22 09:43:41 2017
@@ -168,14 +168,9 @@ class PigMacro {
Map<String, String> paramVal = pc.getParamVal();
for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) {
- if (paramVal.containsKey(e.getKey())) {
- throw new ParserException(
- "Macro contains argument or return value " + e.getKey() + " which conflicts " +
- "with a Pig parameter of the same name."
- );
- } else {
- paramVal.put(e.getKey(), e.getValue());
- }
+ // overwrite=false since macro parameters should have precedence
+ // over commandline parameters (if keys overlap)
+ pc.processOrdLine(e.getKey(), e.getValue(), false);
}
ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc);
@@ -219,6 +214,7 @@ class PigMacro {
try {
result = parser.query();
} catch (RecognitionException e) {
+ e.line += startLine -1;
String msg = (fileName == null) ? parser.getErrorHeader(e)
: QueryParserUtils.generateErrorHeader(e, fileName);
msg += " " + parser.getErrorMessage(e, parser.getTokenNames());
@@ -236,7 +232,7 @@ class PigMacro {
if (!macroDefNodes.isEmpty()) {
String fname = ((PigParserNode)ast).getFileName();
String msg = getErrorMessage(fname, ast.getLine(),
- "Invalide macro definition", "macro '" + name
+ "Invalid macro definition", "macro '" + name
+ "' contains macro definition.\nmacro content: "
+ body);
throw new ParserException(msg);
@@ -273,6 +269,7 @@ class PigMacro {
try {
result2 = walker.query();
} catch (RecognitionException e) {
+ e.line += startLine - 1;
String msg = walker.getErrorHeader(e) + " "
+ walker.getErrorMessage(e, walker.getTokenNames());
String msg2 = getErrorMessage(file, line, "Failed to mask macro '"
Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Wed Feb 22 09:43:41 2017
@@ -889,6 +889,8 @@ scalar : INTEGER
| LONGINTEGER
| FLOATNUMBER
| DOUBLENUMBER
+ | BIGINTEGERNUMBER
+ | BIGDECIMALNUMBER
| QUOTEDSTRING
| NULL
| TRUE
Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Wed Feb 22 09:43:41 2017
@@ -23,6 +23,10 @@ import java.net.URISyntaxException;
import org.apache.pig.PigServer;
import org.apache.pig.tools.DownloadResolver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.hadoop.fs.Path;
public class RegisterResolver {
@@ -66,15 +70,24 @@ public class RegisterResolver {
String scheme = uri.getScheme();
if (scheme != null) {
scheme = scheme.toLowerCase();
+ if (scheme.equals("ivy")) {
+ DownloadResolver downloadResolver = DownloadResolver.getInstance();
+ return downloadResolver.downloadArtifact(uri, pigServer);
+ }
+ if (!hasFileSystemImpl(uri)) {
+ throw new ParserException("Invalid Scheme: " + uri.getScheme());
+ }
}
- if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
- return new URI[] { uri };
- } else if (scheme.equals("ivy")) {
- DownloadResolver downloadResolver = DownloadResolver.getInstance();
- return downloadResolver.downloadArtifact(uri, pigServer);
- } else {
- throw new ParserException("Invalid Scheme: " + uri.getScheme());
- }
+ return new URI[] { uri };
+ }
+
+ /**
+ * @param uri
+ * @return True if the uri has valid file system implementation
+ */
+ private boolean hasFileSystemImpl(URI uri) {
+ Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true);
+ return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
}
/**
Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Wed Feb 22 09:43:41 2017
@@ -75,12 +75,11 @@ public class SourceLocation {
if (node != null) {
InvocationPoint pt = node.getNextInvocationPoint();
while (pt != null) {
- sb.append("\n");
sb.append("at expanding macro '" + pt.getMacro() + "' ("
+ pt.getFile() + ":" + pt.getLine() + ")");
pt = node.getNextInvocationPoint();
+ sb.append("\n");
}
- sb.append("\n");
}
sb.append( "<" );
if( file != null && !file.isEmpty() )
Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Wed Feb 22 09:43:41 2017
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr
*
*/
public class LocalMapReduceSimulator {
-
+
private MapReduceLauncher launcher = new MapReduceLauncher();
-
+
private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
@SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@ public class LocalMapReduceSimulator {
PigContext pc) throws PigException, IOException, InterruptedException {
phyToMRMap.clear();
MROperPlan mrp = launcher.compile(php, pc);
-
+
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-
+
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
+
JobControl jc;
int numMRJobsCompl = 0;
DataBag input;
@@ -106,6 +108,8 @@ public class LocalMapReduceSimulator {
boolean needFileInput;
final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
pc.getProperties().setProperty("pig.illustrating", "true");
+ String jtIdentifier = "" + System.currentTimeMillis();
+ int jobId = 0;
while(mrp.size() != 0) {
jc = jcc.compile(mrp, "Illustrator");
if(jc == null) {
@@ -113,6 +117,7 @@ public class LocalMapReduceSimulator {
}
List<Job> jobs = jc.getWaitingJobs();
for (Job job : jobs) {
+ jobId++;
jobConf = job.getJobConf();
FileLocalizer.setInitialized(false);
ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@ public class LocalMapReduceSimulator {
PigSplit split = null;
List<POStore> stores = null;
PhysicalOperator pack = null;
- // revisit as there are new physical operators from MR compilation
+ // revisit as there are new physical operators from MR compilation
if (!mro.mapPlan.isEmpty())
attacher.revisit(mro.mapPlan);
if (!mro.reducePlan.isEmpty()) {
attacher.revisit(mro.reducePlan);
pack = mro.reducePlan.getRoots().get(0);
}
-
+
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
if (!mro.mapPlan.isEmpty()) {
stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@ public class LocalMapReduceSimulator {
for (POStore store : stores) {
output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
}
-
+
OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
oa.visit();
-
+
if (!mro.reducePlan.isEmpty()) {
oa = new OutputAttacher(mro.reducePlan, output);
oa.visit();
@@ -168,6 +173,7 @@ public class LocalMapReduceSimulator {
if (input != null)
mro.mapPlan.remove(ld);
}
+ int mapTaskId = 0;
for (POLoad ld : lds) {
// check newly generated data first
input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@ public class LocalMapReduceSimulator {
break;
}
}
- }
+ }
}
needFileInput = (input == null);
split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@ public class LocalMapReduceSimulator {
context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
}
((PigMapBase) map).setMapPlan(mro.mapPlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
} else {
if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@ public class LocalMapReduceSimulator {
Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
.getIllustratorContext(jobConf, input, intermediateData, split);
((PigMapBase) map).setMapPlan(mro.mapPlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
map.run(context);
}
}
-
+
if (!mro.reducePlan.isEmpty())
{
if (pack instanceof POPackage)
@@ -233,19 +241,20 @@ public class LocalMapReduceSimulator {
}
((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+ context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
reduce.run(context);
}
for (PhysicalOperator key : mro.phyToMRMap.keySet())
for (PhysicalOperator value : mro.phyToMRMap.get(key))
phyToMRMap.put(key, value);
}
-
-
+
+
int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-
+
numMRJobsCompl += removedMROp;
}
-
+
jcc.reset();
}
@@ -256,7 +265,7 @@ public class LocalMapReduceSimulator {
plan));
this.outputBuffer = output;
}
-
+
@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Wed Feb 22 09:43:41 2017
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.util.Shell;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStats;
/**
@@ -127,7 +128,9 @@ public abstract class ScriptEngine {
//protected static InputStream getScriptAsStream(String scriptPath) {
InputStream is = null;
File file = new File(scriptPath);
- if (file.exists()) {
+ // In the frontend give preference to the local file.
+ // In the backend, try the jar first
+ if (UDFContext.getUDFContext().isFrontend() && file.exists()) {
try {
is = new FileInputStream(file);
} catch (FileNotFoundException e) {
@@ -156,7 +159,14 @@ public abstract class ScriptEngine {
}
}
}
-
+ if (is == null && file.exists()) {
+ try {
+ is = new FileInputStream(file);
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException("could not find existing file "+scriptPath, e);
+ }
+ }
+
// TODO: discuss if we want to add logic here to load a script from HDFS
if (is == null) {
Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc
private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) {
if (LOG.isDebugEnabled()) {
- LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema));
+ LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema));
}
}
Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Wed Feb 22 09:43:41 2017
@@ -54,7 +54,7 @@ public class JythonFunction extends Eval
try {
f = JythonScriptEngine.getFunction(filename, functionName);
this.function = f;
- num_parameters = ((PyBaseCode) f.func_code).co_argcount;
+ num_parameters = ((PyBaseCode) f.__code__).co_argcount;
PyObject outputSchemaDef = f.__findattr__("outputSchema".intern());
if (outputSchemaDef != null) {
this.schema = Utils.getSchemaFromString(outputSchemaDef.toString());
@@ -105,7 +105,7 @@ public class JythonFunction extends Eval
@Override
public Object exec(Tuple tuple) throws IOException {
try {
- if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) {
+ if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) {
// ignore input tuple
PyObject out = function.__call__();
return JythonUtils.pythonToPig(out);
Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Wed Feb 22 09:43:41 2017
@@ -44,8 +44,6 @@ public class DownloadResolver {
private static DownloadResolver downloadResolver = new DownloadResolver();
private DownloadResolver() {
- System.setProperty("groovy.grape.report.downloads", "true");
-
if (System.getProperty("grape.config") != null) {
LOG.info("Using ivysettings file from " + System.getProperty("grape.config"));
} else {
Added: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.grunt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+
+import jline.console.ConsoleReader;
+
+/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However,
+ * we cannot use ConsoleReaderInputStream directly since:
+ * 1. ConsoleReaderInputStream is not public
+ * 2. ConsoleReaderInputStream has a bug which does not deal with UTF-8 correctly
+ */
+public class ConsoleReaderInputStream extends SequenceInputStream {
+ private static InputStream systemIn = System.in;
+
+ public static void setIn() throws IOException {
+ setIn(new ConsoleReader());
+ }
+
+ public static void setIn(final ConsoleReader reader) {
+ System.setIn(new ConsoleReaderInputStream(reader));
+ }
+
+ /**
+ * Restore the original {@link System#in} input stream.
+ */
+ public static void restoreIn() {
+ System.setIn(systemIn);
+ }
+
+ public ConsoleReaderInputStream(final ConsoleReader reader) {
+ super(new ConsoleEnumeration(reader));
+ }
+
+ private static class ConsoleEnumeration implements Enumeration {
+ private final ConsoleReader reader;
+ private ConsoleLineInputStream next = null;
+ private ConsoleLineInputStream prev = null;
+
+ public ConsoleEnumeration(final ConsoleReader reader) {
+ this.reader = reader;
+ }
+
+ public Object nextElement() {
+ if (next != null) {
+ InputStream n = next;
+ prev = next;
+ next = null;
+
+ return n;
+ }
+
+ return new ConsoleLineInputStream(reader);
+ }
+
+ public boolean hasMoreElements() {
+ // the last line was null
+ if ((prev != null) && (prev.wasNull == true)) {
+ return false;
+ }
+
+ if (next == null) {
+ next = (ConsoleLineInputStream) nextElement();
+ }
+
+ return next != null;
+ }
+ }
+
+ private static class ConsoleLineInputStream extends InputStream {
+ private final ConsoleReader reader;
+ private byte[] buffer = null;
+ private int index = 0;
+ private boolean eol = false;
+ protected boolean wasNull = false;
+
+ public ConsoleLineInputStream(final ConsoleReader reader) {
+ this.reader = reader;
+ }
+
+ public int read() throws IOException {
+ if (eol) {
+ return -1;
+ }
+
+ if (buffer == null) {
+ buffer = reader.readLine().getBytes();
+ }
+
+ if (buffer == null) {
+ wasNull = true;
+ return -1;
+ }
+
+ if (index >= buffer.length) {
+ eol = true;
+ return '\n'; // lines are ended with a newline
+ }
+
+ return buffer[index++];
+ }
+ }
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Wed Feb 22 09:43:41 2017
@@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt;
import java.io.BufferedReader;
import java.util.ArrayList;
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,8 +52,8 @@ public class Grunt
public void setConsoleReader(ConsoleReader c)
{
- c.addCompletor(new PigCompletorAliases(pig));
- c.addCompletor(new PigCompletor());
+ c.addCompleter(new PigCompletorAliases(pig));
+ c.addCompleter(new PigCompletor());
parser.setConsoleReader(c);
}
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Wed Feb 22 09:43:41 2017
@@ -26,7 +26,6 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Reader;
import java.io.StringReader;
@@ -42,8 +41,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import jline.ConsoleReader;
-import jline.ConsoleReaderInputStream;
+import jline.console.ConsoleReader;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.logging.Log;
@@ -264,7 +262,7 @@ public class GruntParser extends PigScri
public void prompt()
{
if (mInteractive) {
- mConsoleReader.setDefaultPrompt("grunt> ");
+ mConsoleReader.setPrompt("grunt> ");
}
}
@@ -516,8 +514,13 @@ public class GruntParser extends PigScri
ConsoleReader reader;
boolean interactive;
- mPigServer.getPigContext().setParams(params);
- mPigServer.getPigContext().setParamFiles(files);
+ PigContext pc = mPigServer.getPigContext();
+
+ if( !loadOnly ) {
+ pc.getPreprocessorContext().paramScopePush();
+ }
+ pc.setParams(params);
+ pc.setParamFiles(files);
try {
FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
@@ -528,7 +531,7 @@ public class GruntParser extends PigScri
cmds = cmds.replaceAll("\t"," ");
reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()),
- new OutputStreamWriter(System.out));
+ System.out);
reader.setHistory(mConsoleReader.getHistory());
InputStream in = new ConsoleReaderInputStream(reader);
inputReader = new BufferedReader(new InputStreamReader(in));
@@ -560,6 +563,9 @@ public class GruntParser extends PigScri
if (interactive) {
System.out.println("");
}
+ if( ! loadOnly ) {
+ pc.getPreprocessorContext().paramScopePop();
+ }
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Wed Feb 22 09:43:41 2017
@@ -33,9 +33,9 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import jline.Completor;
+import jline.console.completer.Completer;
-public class PigCompletor implements Completor {
+public class PigCompletor implements Completer {
private final Log log = LogFactory.getLog(getClass());
Set<String> candidates;
static final String AUTOCOMPLETE_FILENAME = "autocomplete";
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Wed Feb 22 09:43:41 2017
@@ -26,12 +26,11 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.pig.PigServer;
-import jline.Completor;
+import jline.console.completer.Completer;
-public class PigCompletorAliases implements Completor {
+public class PigCompletorAliases implements Completer {
private final Log log = LogFactory.getLog(getClass());
Set<String> keywords;
PigServer pig;
Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Wed Feb 22 09:43:41 2017
@@ -259,8 +259,11 @@ TOKEN :
<PIGDEFAULT: "%default" >
}
+
TOKEN :
{
+ <REGISTER: "register"> : IN_REGISTER
+ |
<IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*>
|
<LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") >
@@ -276,7 +279,14 @@ TOKEN :
<OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ >
|
<NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] >
-
+}
+
+<IN_REGISTER> MORE : { " " | "\t" | "\r" | "\n"}
+
+<IN_REGISTER> TOKEN: {
+ <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> {
+ matchedToken.image = image.toString();
+ }: DEFAULT
}
void Parse() throws IOException : {}
@@ -288,6 +298,7 @@ void input() throws IOException :
{
String s;
Token strTok = null;
+ Token strTok2 = null;
}
{
strTok = <PIG>
@@ -308,6 +319,20 @@ void input() throws IOException :
{ pc.validate(strTok.toString()); }
)
|
+ strTok = <REGISTER>
+ strTok2 = <PATH> {}
+ {
+ // Adding a special case for register since it handles "/*" globbing
+ // and this conflicts with general multi-line comment "/* */".
+ // See the comment above on OTHERS on how tokenizer matches the longest
+ // match. Here, string next to "register" is treated as PATH TOKEN
+ // and therefore not consider "/*" as part of the comment
+ // (and avoid the longest match problem).
+ out.append(strTok.image);
+ String sub_line = pc.substitute(strTok2.image);
+ out.append(sub_line);
+ }
+ |
s = paramString(){}
{
//process an ordinary pig line - perform substitution