You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [7/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/sr...
Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Thu Nov 27 12:49:54 2014
@@ -187,19 +187,18 @@ public abstract class DefaultAbstractBag
private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) {
long used = avgTupleSize * numInMem;
- // add up the overhead for this object and other object variables
- int bag_fix_size = 8 /* object header */
- + 4 + 8 + 8 /* mLastContentsSize + mMemSize + mSize */
- + 8 + 8 /* mContents ref + mSpillFiles ref*/
- + 4 /* +4 to round it to eight*/
- + 36 /* mContents fixed */
- ;
long mFields_size = roundToEight(4 + numInMem*4); /* mContents fixed + per entry */
// in java hotspot 32bit vm, there seems to be a minimum bag size of 188 bytes
// some of the extra bytes is probably from a minimum size of this array list
mFields_size = Math.max(40, mFields_size);
- used += bag_fix_size + mFields_size;
+ // the fixed overhead for this object and other object variables = 84 bytes
+ // 8 - object header
+ // 4 + 8 + 8 - sampled + aggSampleTupleSize + mSize
+ // 8 + 8 - mContents ref + mSpillFiles ref
+ // 4 - spillableRegistered +4 instead of 1 to round it to eight
+ // 36 - mContents fixed
+ used += 84 + mFields_size;
// add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList,
// object variable inside ArrayList and references to spill files
@@ -444,7 +443,7 @@ public abstract class DefaultAbstractBag
if (reporter != null && reporter.getCounter(counter)!=null) {
reporter.getCounter(counter).increment(numRecsSpilled);
} else {
- PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter);
+ PigHadoopLogger.getInstance().warn(mContents, "Spill counter incremented", counter);
}
}
Modified: pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java Thu Nov 27 12:49:54 2014
@@ -45,7 +45,6 @@ import org.apache.pig.impl.util.ObjectSe
*/
public class DefaultTuple extends AbstractTuple {
- protected boolean isNull = false;
private static final long serialVersionUID = 2L;
protected List<Object> mFields;
@@ -165,11 +164,6 @@ public class DefaultTuple extends Abstra
@Override
public long getMemorySize() {
Iterator<Object> i = mFields.iterator();
- // fixed overhead
- long empty_tuple_size = 8 /* tuple object header */
- + 8 /* isNull - but rounded to 8 bytes as total obj size needs to be multiple of 8 */
- + 8 /* mFields reference */
- + 32 /* mFields array list fixed size */;
// rest of the fixed portion of mfields size is accounted within empty_tuple_size
long mfields_var_size = SizeUtil.roundToEight(4 + 4 * mFields.size());
@@ -177,7 +171,11 @@ public class DefaultTuple extends Abstra
// which is probably from the minimum size of this array list
mfields_var_size = Math.max(40, mfields_var_size);
- long sum = empty_tuple_size + mfields_var_size;
+ // fixed overhead = 48 bytes
+ //8 - tuple object header
+ //8 - mFields reference
+ //32 - mFields array list fixed size
+ long sum = 48 + mfields_var_size;
while (i.hasNext()) {
sum += SizeUtil.getPigObjMemSize(i.next());
}
Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Thu Nov 27 12:49:54 2014
@@ -516,19 +516,27 @@ public class DistinctDataBag extends Def
// the spill files list. So I need to append it to my
// linked list as well so that it's still there when I
// move my linked list back to the spill files.
+ DataOutputStream out = null;
try {
- DataOutputStream out = getSpillFile();
+ out = getSpillFile();
ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
Tuple t;
while ((t = readFromTree()) != null) {
t.write(out);
}
out.flush();
- out.close();
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
+ }
}
}
// delete files that have been merged into new files
Modified: pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java Thu Nov 27 12:49:54 2014
@@ -36,6 +36,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -81,7 +82,7 @@ public class InternalDistinctBag extends
if (percent < 0) {
percent = 0.2F;
if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+ String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
if (usage != null) {
percent = Float.parseFloat(usage);
}
@@ -424,19 +425,25 @@ public class InternalDistinctBag extends
// the spill files list. So I need to append it to my
// linked list as well so that it's still there when I
// move my linked list back to the spill files.
+ DataOutputStream out = null;
try {
- DataOutputStream out = getSpillFile();
+ out = getSpillFile();
ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
Tuple t;
while ((t = readFromTree()) != null) {
t.write(out);
}
out.flush();
- out.close();
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
+ } finally {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
}
}
Modified: pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java Thu Nov 27 12:49:54 2014
@@ -35,6 +35,7 @@ import java.util.PriorityQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
/**
@@ -401,19 +402,27 @@ public class InternalSortedBag extends S
// the spill files list. So I need to append it to my
// linked list as well so that it's still there when I
// move my linked list back to the spill files.
+ DataOutputStream out = null;
try {
- DataOutputStream out = getSpillFile();
+ out = getSpillFile();
ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
Tuple t;
while ((t = readFromPriorityQ()) != null) {
t.write(out);
}
out.flush();
- out.close();
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
+ }
}
}
// delete files that have been merged into new files
Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Thu Nov 27 12:49:54 2014
@@ -17,7 +17,7 @@
*/
package org.apache.pig.data;
-import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED;
import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
import java.io.File;
@@ -33,7 +33,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConstants;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
@@ -151,8 +150,8 @@ public class SchemaTupleBackend {
return;
}
// Step one is to see if there are any classes in the distributed cache
- if (!jConf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
- LOG.info("Key [" + SHOULD_USE_SCHEMA_TUPLE +"] was not set... will not generate code.");
+ if (!jConf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+ LOG.info("Key [" + PIG_SCHEMA_TUPLE_ENABLED +"] was not set... will not generate code.");
return;
}
// Step two is to copy everything from the distributed cache if we are in distributed mode
@@ -184,14 +183,22 @@ public class SchemaTupleBackend {
LOG.info("Attempting to read file: " + s);
// The string is the symlink into the distributed cache
File src = new File(s);
- FileInputStream fin = new FileInputStream(src);
- FileOutputStream fos = new FileOutputStream(new File(codeDir, s));
-
- fin.getChannel().transferTo(0, src.length(), fos.getChannel());
+ FileInputStream fin = null;
+ FileOutputStream fos = null;
+ try {
+ fin = new FileInputStream(src);
+ fos = new FileOutputStream(new File(codeDir, s));
- fin.close();
- fos.close();
- LOG.info("Successfully copied file to local directory.");
+ fin.getChannel().transferTo(0, src.length(), fos.getChannel());
+ LOG.info("Successfully copied file to local directory.");
+ } finally {
+ if (fin != null) {
+ fin.close();
+ }
+ if (fos != null) {
+ fos.close();
+ }
+ }
}
}
Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java Thu Nov 27 12:49:54 2014
@@ -61,27 +61,27 @@ public class SchemaTupleClassGenerator {
* This context is used in UDF code. Currently, this is only used for
* the inputs to UDF's.
*/
- UDF (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_UDF, true, GenerateUdf.class),
+ UDF (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_UDF, true, GenerateUdf.class),
/**
* This context is for POForEach. This will use the expected output of a ForEach
* to return a typed Tuple.
*/
- FOREACH (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH, true, GenerateForeach.class),
+ FOREACH (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FOREACH, true, GenerateForeach.class),
/**
* This context controls whether or not SchemaTuples will be used in FR joins.
* Currently, they will be used in the HashMap that FR Joins construct.
*/
- FR_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN, true, GenerateFrJoin.class),
+ FR_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FRJOIN, true, GenerateFrJoin.class),
/**
* This context controls whether or not SchemaTuples will be used in merge joins.
*/
- MERGE_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
+ MERGE_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
/**
* All registered Schemas will also be registered in one additional context.
* This context will allow users to "force" the load of a SchemaTupleFactory
* if one is present in any context.
*/
- FORCE_LOAD (PigConfiguration.SCHEMA_TUPLE_SHOULD_ALLOW_FORCE, true, GenerateForceLoad.class);
+ FORCE_LOAD (PigConfiguration.PIG_SCHEMA_TUPLE_ALLOW_FORCE, true, GenerateForceLoad.class);
/**
* These annotations are used to mark a given SchemaTuple with
@@ -226,7 +226,7 @@ public class SchemaTupleClassGenerator {
*/
//TODO in the future, we can use ASM to generate the bytecode directly.
private static void compileCodeString(String className, String generatedCodeString, File codeDir) {
- JavaCompilerHelper compiler = new JavaCompilerHelper();
+ JavaCompilerHelper compiler = new JavaCompilerHelper();
String tempDir = codeDir.getAbsolutePath();
compiler.addToClassPath(tempDir);
LOG.debug("Compiling SchemaTuple code with classpath: " + compiler.getClassPath());
@@ -242,12 +242,14 @@ public class SchemaTupleClassGenerator {
this.id = id;
}
+ @Override
public void prepare() {
add("@Override");
add("protected int generatedCodeCompareToSpecific(SchemaTuple_"+id+" t) {");
add(" int i = 0;");
}
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
add(" i = compare(checkIfNull_" + fieldNum + "(), getPos_"
+ fieldNum + "(), t.checkIfNull_" + fieldNum + "(), t.getPos_"
@@ -257,6 +259,7 @@ public class SchemaTupleClassGenerator {
add(" }");
}
+ @Override
public void end() {
add(" return i;");
add("}");
@@ -271,6 +274,7 @@ public class SchemaTupleClassGenerator {
this.id = id;
}
+ @Override
public void prepare() {
add("@Override");
add("protected int generatedCodeCompareTo(SchemaTuple t, boolean checkType) {");
@@ -282,6 +286,7 @@ public class SchemaTupleClassGenerator {
boolean compIsNull = false;
boolean compByte = false;
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
add(" i = compareWithElementAtPos(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t, " + fieldNum + ");");
add(" if (i != 0) {");
@@ -289,6 +294,7 @@ public class SchemaTupleClassGenerator {
add(" }");
}
+ @Override
public void end() {
add(" return 0;");
add("}");
@@ -296,16 +302,19 @@ public class SchemaTupleClassGenerator {
}
static class HashCode extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public int generatedCodeHashCode() {");
add(" int h = 17;");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" h = hashCodePiece(h, getPos_" + fieldPos + "(), checkIfNull_" + fieldPos + "());");
}
+ @Override
public void end() {
add(" return h;");
add("}");
@@ -323,6 +332,7 @@ public class SchemaTupleClassGenerator {
private int booleans = 0;
private File codeDir;
+ @Override
public void prepare() {
String s;
try {
@@ -333,6 +343,7 @@ public class SchemaTupleClassGenerator {
add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (!isTuple()) {
if (isPrimitive() && (primitives++ % 8 == 0)) {
@@ -385,6 +396,7 @@ public class SchemaTupleClassGenerator {
private int byteField = 0; //this is for setting booleans
private int byteIncr = 0; //this is for counting the booleans we've encountered
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (!isTuple()) {
add("public void setPos_"+fieldPos+"("+typeName()+" v) {");
@@ -433,27 +445,32 @@ public class SchemaTupleClassGenerator {
}
static class ListSetString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public void generatedCodeSetIterator(Iterator<Object> it) throws ExecException {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" setPos_"+fieldPos+"(unbox(it.next(), getDummy_"+fieldPos+"()));");
}
+ @Override
public void end() {
add("}");
}
}
static class GenericSetString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public void generatedCodeSetField(int fieldNum, Object val) throws ExecException {");
add(" switch (fieldNum) {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" case ("+fieldPos+"):");
add(" if (val == null) {");
@@ -464,6 +481,7 @@ public class SchemaTupleClassGenerator {
add(" break;");
}
+ @Override
public void end() {
add(" default:");
add(" throw new ExecException(\"Invalid index given to set: \" + fieldNum);");
@@ -473,16 +491,19 @@ public class SchemaTupleClassGenerator {
}
static class GenericGetString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public Object generatedCodeGetField(int fieldNum) throws ExecException {");
add(" switch (fieldNum) {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"() ? null : box(getPos_"+fieldPos+"());");
}
+ @Override
public void end() {
add(" default: throw new ExecException(\"Invalid index given to get: \" + fieldNum);");
add(" }");
@@ -491,16 +512,19 @@ public class SchemaTupleClassGenerator {
}
static class GeneralIsNullString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public boolean isGeneratedCodeFieldNull(int fieldNum) throws ExecException {");
add(" switch (fieldNum) {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"();");
}
+ @Override
public void end() {
add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
add(" }");
@@ -512,6 +536,7 @@ public class SchemaTupleClassGenerator {
private int nullByte = 0; //the byte_ val
private int byteIncr = 0; //the mask we're on
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add("public boolean checkIfNull_" + fieldPos + "() {");
if (isPrimitive()) {
@@ -532,6 +557,7 @@ public class SchemaTupleClassGenerator {
private int nullByte = 0; //the byte_ val
private int byteIncr = 0; //the mask we're on
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add("public void setNull_"+fieldPos+"(boolean b) {");
if (isPrimitive()) {
@@ -554,11 +580,13 @@ public class SchemaTupleClassGenerator {
static class SetEqualToSchemaTupleSpecificString extends TypeInFunctionStringOut {
private int id;
+ @Override
public void prepare() {
add("@Override");
add("protected SchemaTuple generatedCodeSetSpecific(SchemaTuple_"+id+" t) {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add(" if (t.checkIfNull_" + fieldPos + "()) {");
add(" setNull_" + fieldPos + "(true);");
@@ -568,6 +596,7 @@ public class SchemaTupleClassGenerator {
addBreak();
}
+ @Override
public void end() {
add(" return this;");
add("}");
@@ -586,6 +615,7 @@ public class SchemaTupleClassGenerator {
this.id = id;
}
+ @Override
public void prepare() {
add("@Override");
add("public boolean isSpecificSchemaTuple(Object o) {");
@@ -599,15 +629,18 @@ public class SchemaTupleClassGenerator {
static class WriteNullsString extends TypeInFunctionStringOut {
String s = " boolean[] b = {\n";
+ @Override
public void prepare() {
add("@Override");
add("protected boolean[] generatedCodeNullsArray() throws IOException {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
s += " checkIfNull_"+fieldPos+"(),\n";
}
+ @Override
public void end() {
s = s.substring(0, s.length() - 2) + "\n };";
add(s);
@@ -626,11 +659,13 @@ public class SchemaTupleClassGenerator {
private int booleans = 0;
+ @Override
public void prepare() {
add("@Override");
add("protected void generatedCodeReadFields(DataInput in, boolean[] b) throws IOException {");
}
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (isBoolean()) {
booleans++;
@@ -659,6 +694,7 @@ public class SchemaTupleClassGenerator {
}
}
+ @Override
public void end() {
if (booleans > 0) {
int i = 0;
@@ -679,6 +715,7 @@ public class SchemaTupleClassGenerator {
static class WriteString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("protected void generatedCodeWriteElements(DataOutput out) throws IOException {");
@@ -686,6 +723,7 @@ public class SchemaTupleClassGenerator {
private int booleans = 0;
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (isBoolean()) {
booleans++;
@@ -697,6 +735,7 @@ public class SchemaTupleClassGenerator {
}
}
+ @Override
public void end() {
if (booleans > 0) {
int i = 0;
@@ -716,6 +755,7 @@ public class SchemaTupleClassGenerator {
String s = " return SizeUtil.roundToEight(";
+ @Override
public void prepare() {
add("@Override");
add("public long getGeneratedCodeMemorySize() {");
@@ -725,6 +765,7 @@ public class SchemaTupleClassGenerator {
private int primitives = 0;
//TODO a null array or object variable still takes up space for the pointer, yes?
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (isInt() || isFloat()) {
size += 4;
@@ -757,6 +798,7 @@ public class SchemaTupleClassGenerator {
}
}
+ @Override
public void end() {
s += size + ");";
add(s);
@@ -766,6 +808,7 @@ public class SchemaTupleClassGenerator {
}
static class GetDummyString extends TypeInFunctionStringOut {
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
add("public "+typeName()+" getDummy_"+fieldPos+"() {");
switch (fs.type) {
@@ -795,6 +838,7 @@ public class SchemaTupleClassGenerator {
private int booleanByte = 0;
private int booleans;
+ @Override
public void process(int fieldPos, Schema.FieldSchema fs) {
if (!isTuple()) {
add("public "+typeName()+" getPos_"+fieldPos+"() {");
@@ -823,6 +867,7 @@ public class SchemaTupleClassGenerator {
static class GetSchemaTupleIdentifierString extends TypeInFunctionStringOut {
private int id;
+ @Override
public void end() {
add("@Override");
add("public int getSchemaTupleIdentifier() {");
@@ -839,10 +884,12 @@ public class SchemaTupleClassGenerator {
static class SchemaSizeString extends TypeInFunctionStringOut {
int i = 0;
+ @Override
public void process(int fieldNum, Schema.FieldSchema fS) {
i++;
}
+ @Override
public void end() {
add("@Override");
add("protected int schemaSize() {");
@@ -855,10 +902,12 @@ public class SchemaTupleClassGenerator {
static class SizeString extends TypeInFunctionStringOut {
int i = 0;
+ @Override
public void process(int fieldNum, Schema.FieldSchema fS) {
i++;
}
+ @Override
public void end() {
add("@Override");
add("protected int generatedCodeSize() {");
@@ -873,16 +922,19 @@ public class SchemaTupleClassGenerator {
}
static class GetTypeString extends TypeInFunctionStringOut {
+ @Override
public void prepare() {
add("@Override");
add("public byte getGeneratedCodeFieldType(int fieldNum) throws ExecException {");
add(" switch (fieldNum) {");
}
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
add(" case ("+fieldNum+"): return "+fs.type+";");
}
+ @Override
public void end() {
add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
add(" }");
@@ -898,6 +950,7 @@ public class SchemaTupleClassGenerator {
this.id = id;
}
+ @Override
public void prepare() {
add("@Override");
add("protected SchemaTuple generatedCodeSet(SchemaTuple t, boolean checkClass) throws ExecException {");
@@ -913,6 +966,7 @@ public class SchemaTupleClassGenerator {
addBreak();
}
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
add(" if ("+fs.type+" != theirFS.get("+fieldNum+").type) {");
add(" throw new ExecException(\"Given SchemaTuple does not match current in field " + fieldNum + ". Expected type: " + fs.type + ", found: \" + theirFS.get("+fieldNum+").type);");
@@ -929,6 +983,7 @@ public class SchemaTupleClassGenerator {
addBreak();
}
+ @Override
public void end() {
add(" return this;");
add("}");
@@ -940,18 +995,21 @@ public class SchemaTupleClassGenerator {
super(type);
}
+ @Override
public void prepare() {
add("@Override");
add("protected "+name()+" generatedCodeGet"+properName()+"(int fieldNum) throws ExecException {");
add(" switch(fieldNum) {");
}
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
if (fs.type==thisType()) {
add(" case ("+fieldNum+"): return returnUnlessNull(checkIfNull_"+fieldNum+"(), getPos_"+fieldNum+"());");
}
}
+ @Override
public void end() {
add(" default:");
add(" return unbox"+properName()+"(getTypeAwareBase(fieldNum, \""+name()+"\"));");
@@ -979,17 +1037,20 @@ public class SchemaTupleClassGenerator {
return proper(thisType());
}
+ @Override
public void prepare() {
add("@Override");
add("protected void generatedCodeSet"+properName()+"(int fieldNum, "+name()+" val) throws ExecException {");
add(" switch(fieldNum) {");
}
+ @Override
public void process(int fieldNum, Schema.FieldSchema fs) {
if (fs.type==thisType())
add(" case ("+fieldNum+"): setPos_"+fieldNum+"(val); break;");
}
+ @Override
public void end() {
add(" default: setTypeAwareBase(fieldNum, val, \""+name()+"\");");
add(" }");
Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java Thu Nov 27 12:49:54 2014
@@ -17,7 +17,7 @@
*/
package org.apache.pig.data;
-import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED;
import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY;
import static org.apache.pig.PigConstants.LOCAL_CODE_DIR;
import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
@@ -177,8 +177,8 @@ public class SchemaTupleFrontend {
*/
private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
boolean filesToShip = false;
- if (!conf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
- LOG.info("Key ["+SHOULD_USE_SCHEMA_TUPLE+"] is false, will not generate code.");
+ if (!conf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+ LOG.info("Key ["+PIG_SCHEMA_TUPLE_ENABLED+"] is false, will not generate code.");
return false;
}
LOG.info("Generating all registered Schemas.");
Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Thu Nov 27 12:49:54 2014
@@ -63,7 +63,7 @@ public abstract class SelfSpillBag exten
maxMem = Runtime.getRuntime().maxMemory();
if (PigMapReduce.sJobConfInternal.get() != null) {
String usage = PigMapReduce.sJobConfInternal.get().get(
- PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+ PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
if (usage != null) {
cachedMemUsage = Float.parseFloat(usage);
}
Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Thu Nov 27 12:49:54 2014
@@ -491,19 +491,25 @@ public class SortedDataBag extends Defau
// the spill files list. So I need to append it to my
// linked list as well so that it's still there when I
// move my linked list back to the spill files.
+ DataOutputStream out = null;
try {
- DataOutputStream out = getSpillFile();
+ out = getSpillFile();
ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
Tuple t;
while ((t = readFromPriorityQ()) != null) {
t.write(out);
}
out.flush();
- out.close();
} catch (IOException ioe) {
String msg = "Unable to find our spill file.";
log.fatal(msg, ioe);
throw new RuntimeException(msg, ioe);
+ } finally {
+ try {
+ out.close();
+ } catch (IOException e) {
+ warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+ }
}
}
// delete files that have been merged into new files
Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Thu Nov 27 12:49:54 2014
@@ -48,13 +48,13 @@ import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.apache.pig.ExecType;
import org.apache.pig.ExecTypeProvider;
import org.apache.pig.FuncSpec;
-import org.apache.pig.Main;
+import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
@@ -65,7 +65,6 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.JarManager;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
import org.apache.pig.tools.parameters.ParseException;
import org.apache.pig.tools.parameters.PreprocessorContext;
@@ -105,7 +104,9 @@ public class PigContext implements Seria
* Resources for the job (jars, scripting udf files, cached macro abstract syntax trees)
*/
- // extra jar files that are needed to run a job
+ // Jar files that are global to the whole Pig script, includes
+ // 1. registered jars
+ // 2. Jars defined in -Dpig.additional.jars
transient public List<URL> extraJars = new LinkedList<URL>();
// original paths each extra jar came from
@@ -115,10 +116,6 @@ public class PigContext implements Seria
// jars needed for scripting udfs - jython.jar etc
transient public List<String> scriptJars = new ArrayList<String>(2);
- // jars that should not be merged in.
- // (some functions may come from pig.jar and we don't want the whole jar file.)
- transient public Vector<String> skipJars = new Vector<String>(2);
-
// jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
transient public Vector<String> predeployedJars = new Vector<String>(2);
@@ -174,6 +171,15 @@ public class PigContext implements Seria
// List of paths skipped for automatic shipping
List<String> skippedShipPaths = new ArrayList<String>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ packageImportList.set(null);
+ }
+
/**
* extends URLClassLoader to allow adding to classpath as new jars
* are registered.
@@ -260,13 +266,6 @@ public class PigContext implements Seria
this.properties = properties;
this.properties.setProperty("exectype", this.execType.name());
- String pigJar = JarManager.findContainingJar(Main.class);
- String hadoopJar = JarManager.findContainingJar(FileSystem.class);
- if (pigJar != null) {
- addSkipJar(pigJar);
- if (!pigJar.equals(hadoopJar))
- addSkipJar(hadoopJar);
- }
this.executionEngine = execType.getExecutionEngine(this);
@@ -345,12 +344,6 @@ public class PigContext implements Seria
}
}
- public void addSkipJar(String path) {
- if (path != null && !skipJars.contains(path)) {
- skipJars.add(path);
- }
- }
-
public void addJar(String path) throws MalformedURLException {
if (path != null) {
URL resource = (new File(path)).toURI().toURL();
@@ -409,6 +402,7 @@ public class PigContext implements Seria
public String doParamSubstitution(BufferedReader reader) throws IOException {
try {
+ preprocessorContext.setPigContext(this);
preprocessorContext.loadParamVal(params, paramFiles);
ParameterSubstitutionPreprocessor psp
= new ParameterSubstitutionPreprocessor(preprocessorContext);
Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Thu Nov 27 12:49:54 2014
@@ -43,7 +43,24 @@ public class PigImplConstants {
public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
/**
+ * Used by pig to indicate that current job has been converted to run in fetch mode
+ */
+ public static final String CONVERTED_TO_FETCH = "pig.job.converted.fetch";
+
+ /**
* Indicate the split index of the task. Used by merge cogroup
*/
public static final String PIG_SPLIT_INDEX = "pig.split.index";
+
+ /**
+ * Parallelism for the reducer
+ */
+ public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
+ public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
+ public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
+
+ /**
+ * Parallelism to be used for CROSS operation by GFCross UDF
+ */
+ public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
}
Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Nov 27 12:49:54 2014
@@ -173,7 +173,7 @@ public class FindQuantiles extends EvalF
}
long numSamples = samples.size();
double toSkip = (double)numSamples / numQuantiles;
- if(toSkip == 0) {
+ if(toSkip < 1) {
// numSamples is < numQuantiles;
// set numQuantiles to numSamples
numQuantiles = (int)numSamples;
Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Thu Nov 27 12:49:54 2014
@@ -22,13 +22,12 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.UDFContext;
@@ -56,16 +55,19 @@ public class GFCross extends EvalFunc<Da
parallelism = DEFAULT_PARALLELISM;
Configuration cfg = UDFContext.getUDFContext().getJobConf();
if (cfg != null) {
- String s = cfg.get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey);
+ String s = cfg.get(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey);
if (s == null) {
throw new IOException("Unable to get parallelism hint from job conf");
}
parallelism = Integer.valueOf(s);
+ if (parallelism < 0) {
+ throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism);
+ }
}
numInputs = (Integer)input.get(0);
myNumber = (Integer)input.get(1);
-
+
numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
}
@@ -73,21 +75,21 @@ public class GFCross extends EvalFunc<Da
DataBag output = mBagFactory.newDefaultBag();
try{
-
+
int[] digits = new int[numInputs];
digits[myNumber] = r.nextInt(numGroupsPerInput);
for (int i=0; i<numGroupsGoingTo; i++){
output.add(toTuple(digits));
next(digits);
- }
-
+ }
+
return output;
}catch(ExecException e){
throw e;
}
}
-
+
private Tuple toTuple(int[] digits) throws IOException, ExecException{
Tuple t = mTupleFactory.newTuple(numInputs);
for (int i=0; i<numInputs; i++){
@@ -95,7 +97,7 @@ public class GFCross extends EvalFunc<Da
}
return t;
}
-
+
private void next(int[] digits){
for (int i=0; i<numInputs; i++){
if (i== myNumber)
Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 27 12:49:54 2014
@@ -172,8 +172,8 @@ public class PoissonSampleLoader extends
newSample = null;
Configuration conf = split.getConf();
- sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
- heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+ sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+ heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
}
Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java Thu Nov 27 12:49:54 2014
@@ -206,7 +206,7 @@ public class StreamingUDF extends EvalFu
filePath.substring(0, lastSeparator - 1);
command[UDF_NAME] = funcName;
String fileCachePath = jobDir + filePath.substring(0, lastSeparator);
- command[PATH_TO_FILE_CACHE] = "\"" + fileCachePath + "\"";
+ command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'";
command[STD_OUT_OUTPUT_PATH] = outFileName;
command[STD_ERR_OUTPUT_PATH] = errOutFileName;
command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
@@ -227,7 +227,8 @@ public class StreamingUDF extends EvalFu
File userUdfFile = new File(fileCachePath + command[UDF_FILE_NAME] + getUserFileExtension());
if (!userUdfFile.exists()) {
- String absolutePath = filePath.startsWith("/") ? filePath : File.separator + filePath;
+ String absolutePath = filePath.startsWith("/") ? filePath : "/" + filePath;
+ absolutePath = absolutePath.replaceAll(":", "");
String controllerDir = new File(command[PATH_TO_CONTROLLER_FILE]).getParent();
String userUdfPath = controllerDir + absolutePath + getUserFileExtension();
userUdfFile = new File(userUdfPath);
Modified: pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java Thu Nov 27 12:49:54 2014
@@ -478,15 +478,15 @@ public class FileLocalizer {
* since resourthPath should be available in the entire session
*
* @param pigContext
- * @return
+ * @return temporary resource path
* @throws DataStorageException
*/
- public static synchronized ContainerDescriptor getTemporaryResourcePath(final PigContext pigContext)
+ public static synchronized Path getTemporaryResourcePath(final PigContext pigContext)
throws DataStorageException {
if (resourcePath == null) {
resourcePath = getTempContainer(pigContext);
}
- return resourcePath;
+ return ((HPath)resourcePath).getPath();
}
private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext)
@@ -787,6 +787,9 @@ public class FileLocalizer {
boolean multipleFiles) throws IOException {
Path path = new Path(filePath);
+ if (path.getName().isEmpty()) {
+ return new FetchFileRet[0];
+ }
URI uri = path.toUri();
Configuration conf = new Configuration();
ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
@@ -800,7 +803,7 @@ public class FileLocalizer {
&& uri.getScheme() == null )||
// For Windows local files
(uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
- (uri.getScheme() != null && uri.getScheme().equals("local"))
+ (uri.getScheme() != null && uri.getScheme().equals("local"))
) {
srcFs = localFs;
} else {
@@ -859,14 +862,20 @@ public class FileLocalizer {
dest.getParentFile().mkdirs();
dest.deleteOnExit();
- OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest));
- byte[] buffer = new byte[1024];
- int len;
- while ((len=resourceStream.read(buffer)) > 0) {
- outputStream.write(buffer,0,len);
+ OutputStream outputStream = null;
+ try {
+ outputStream = new BufferedOutputStream(new FileOutputStream(dest));
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len=resourceStream.read(buffer)) > 0) {
+ outputStream.write(buffer,0,len);
+ }
+ } finally {
+ resourceStream.close();
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
- outputStream.close();
-
localFileRet = new FetchFileRet(dest,false);
}
else
Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Nov 27 12:49:54 2014
@@ -213,6 +213,9 @@ public class ReadToEndLoader extends Loa
// input completely
PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
new ArrayList<OperatorKey>(), -1);
+ // Set the conf object so that if the wrappedLoadFunc uses it,
+ // it won't be null
+ pigSplit.setConf(conf);
wrappedLoadFunc.prepareToRead(reader, pigSplit);
return true;
}
Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Thu Nov 27 12:49:54 2014
@@ -47,10 +47,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.tools.bzip2r.BZip2Constants;
-import org.codehaus.jackson.annotate.JsonPropertyOrder;
-import org.codehaus.jackson.map.annotate.JacksonStdImpl;
import org.joda.time.DateTime;
import com.google.common.collect.Multimaps;
@@ -68,8 +67,6 @@ public class JarManager {
AUTOMATON(Automaton.class),
ANTLR(CommonTokenStream.class),
GUAVA(Multimaps.class),
- JACKSON_CORE(JsonPropertyOrder.class),
- JACKSON_MAPPER(JacksonStdImpl.class),
JODATIME(DateTime.class);
private final Class pkgClass;
@@ -92,9 +89,16 @@ public class JarManager {
createPigScriptUDFJar(fos, pigContext, contents);
if (!contents.isEmpty()) {
- FileInputStream fis = new FileInputStream(scriptUDFJarFile);
- String md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
- fis.close();
+ FileInputStream fis = null;
+ String md5 = null;
+ try {
+ fis = new FileInputStream(scriptUDFJarFile);
+ md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
File newScriptUDFJarFile = new File(scriptUDFJarFile.getParent(), "PigScriptUDF-" + md5 + ".jar");
scriptUDFJarFile.renameTo(newScriptUDFJarFile);
return newScriptUDFJarFile;
@@ -107,15 +111,20 @@ public class JarManager {
for (String path: pigContext.scriptFiles) {
log.debug("Adding entry " + path + " to job jar" );
InputStream stream = null;
- if (new File(path).exists()) {
- stream = new FileInputStream(new File(path));
+ File inputFile = new File(path);
+ if (inputFile.exists()) {
+ stream = new FileInputStream(inputFile);
} else {
stream = PigContext.getClassLoader().getResourceAsStream(path);
}
if (stream==null) {
throw new IOException("Cannot find " + path);
}
- addStream(jarOutputStream, path, stream, contents);
+ try {
+ addStream(jarOutputStream, path, stream, contents, inputFile.lastModified());
+ } finally {
+ stream.close();
+ }
}
for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
log.debug("Adding entry " + entry.getKey() + " to job jar" );
@@ -128,7 +137,11 @@ public class JarManager {
if (stream==null) {
throw new IOException("Cannot find " + entry.getValue().getPath());
}
- addStream(jarOutputStream, entry.getKey(), stream, contents);
+ try {
+ addStream(jarOutputStream, entry.getKey(), stream, contents, entry.getValue().lastModified());
+ } finally {
+ stream.close();
+ }
}
if (!contents.isEmpty()) {
jarOutputStream.close();
@@ -139,7 +152,7 @@ public class JarManager {
/**
* Creates a Classloader based on the passed jarFile and any extra jar files.
- *
+ *
* @param jarFile
* the jar file to be part of the newly created Classloader. This jar file plus any
* jars in the extraJars list will constitute the classpath.
@@ -161,7 +174,7 @@ public class JarManager {
/**
* Adds a stream to a Jar file.
- *
+ *
* @param os
* the OutputStream of the Jar file to which the stream will be added.
* @param name
@@ -171,15 +184,20 @@ public class JarManager {
* @param contents
* the current contents of the Jar file. (We use this to avoid adding two streams
* with the same name.
+ * @param timestamp
+ * timestamp of the entry
* @throws IOException
*/
- private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents)
+ private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents,
+ long timestamp)
throws IOException {
if (contents.get(name) != null) {
return;
}
contents.put(name, "");
- os.putNextEntry(new JarEntry(name));
+ JarEntry entry = new JarEntry(name);
+ entry.setTime(timestamp);
+ os.putNextEntry(entry);
byte buffer[] = new byte[4096];
int rc;
while ((rc = is.read(buffer)) > 0) {
@@ -190,6 +208,9 @@ public class JarManager {
public static List<String> getDefaultJars() {
List<String> defaultJars = new ArrayList<String>();
for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+ if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
+ continue; //Skip
+ }
String jar = findContainingJar(pkgToSend.getPkgClass());
if (!defaultJars.contains(jar)) {
defaultJars.add(jar);
@@ -201,7 +222,7 @@ public class JarManager {
/**
* Find a jar that contains a class of the same name, if any. It will return a jar file, even if
* that is not the first thing on the class path that has a class with the same name.
- *
+ *
* @param my_class
* the class to find
* @return a jar file that contains the class, or null
@@ -243,12 +264,12 @@ public class JarManager {
}
return null;
}
-
+
/**
* Add the jars containing the given classes to the job's configuration
* such that JobClient will ship them to the cluster and add them to
* the DistributedCache
- *
+ *
* @param job
* Job object
* @param classes
@@ -266,10 +287,10 @@ public class JarManager {
return;
conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
}
-
+
/**
- * Add the qualified path name of jars containing the given classes
- *
+ * Add the qualified path name of jars containing the given classes
+ *
* @param fs
* FileSystem object
* @param jars
Modified: pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java Thu Nov 27 12:49:54 2014
@@ -145,9 +145,9 @@ public class PropertiesUtil {
properties.setProperty("stop.on.failure", ""+false);
}
- if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+ if (properties.getProperty(PigConfiguration.PIG_OPT_FETCH) == null) {
//by default fetch optimization is on
- properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+ properties.setProperty(PigConfiguration.PIG_OPT_FETCH, ""+true);
}
}
Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Thu Nov 27 12:49:54 2014
@@ -43,38 +43,38 @@ import org.apache.commons.logging.LogFac
* <p>
* Low memory is defined as more than 50% of the tenured pool being allocated. Spillable objects are
* tracked using WeakReferences so that the objects can be GCed even though this class has a reference
- * to them.
+ * to them.
*
*/
public class SpillableMemoryManager implements NotificationListener {
-
+
private final Log log = LogFactory.getLog(getClass());
-
+
LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
-
- // if we freed at least this much, invoke GC
+
+ // if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
private static long gcActivationSize = 40000000L ;
-
+
// spill file size should be at least this much
// (default 5MB - this can be overridden by user supplied property)
private static long spillFileSizeThreshold = 5000000L ;
-
+
// this will keep track of memory freed across spills
// and between GC invocations
private static long accumulatedFreeSize = 0L;
-
+
// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
private static double memoryThresholdFraction = 0.7;
-
+
// fraction of biggest heap for which we want to get
// "collection threshold exceeded" notifications
private static double collectionMemoryThresholdFraction = 0.5;
-
+
// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
-
+
// log notification on collection threshold exceeded only the first time
private boolean firstCollectionThreshExceededLogged = false;
@@ -82,54 +82,52 @@ public class SpillableMemoryManager impl
// if we want to perform an extra gc before the spill
private static double extraGCThresholdFraction = 0.05;
private static long extraGCSpillSizeThreshold = 0L;
-
+
private static volatile SpillableMemoryManager manager;
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
- MemoryPoolMXBean biggestHeap = null;
- long biggestSize = 0;
+ MemoryPoolMXBean tenuredHeap = null;
+ long tenuredHeapSize = 0;
long totalSize = 0;
- for (MemoryPoolMXBean b: mpbeans) {
- log.debug("Found heap (" + b.getName() +
- ") of type " + b.getType());
- if (b.getType() == MemoryType.HEAP) {
- /* Here we are making the leap of faith that the biggest
- * heap is the tenured heap
- */
- long size = b.getUsage().getMax();
+ for (MemoryPoolMXBean pool : mpbeans) {
+ log.debug("Found heap (" + pool.getName() + ") of type " + pool.getType());
+ if (pool.getType() == MemoryType.HEAP) {
+ long size = pool.getUsage().getMax();
totalSize += size;
- if (size > biggestSize) {
- biggestSize = size;
- biggestHeap = b;
+ // CMS Old Gen or "tenured" is the only heap that supports
+ // setting usage threshold.
+ if (pool.isUsageThresholdSupported()) {
+ tenuredHeapSize = size;
+ tenuredHeap = pool;
}
}
}
extraGCSpillSizeThreshold = (long) (totalSize * extraGCThresholdFraction);
- if (biggestHeap == null) {
+ if (tenuredHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
log.debug("Selected heap to monitor (" +
- biggestHeap.getName() + ")");
-
- // we want to set both collection and usage threshold alerts to be
+ tenuredHeap.getName() + ")");
+
+ // we want to set both collection and usage threshold alerts to be
// safe. In some local tests after a point only collection threshold
// notifications were being sent though usage threshold notifications
// were sent early on. So using both would ensure that
// 1) we get notified early (though usage threshold exceeded notifications)
// 2) we get notified always when threshold is exceeded (either usage or
// collection)
-
+
/* We set the threshold to be 50% of tenured since that is where
* the GC starts to dominate CPU time according to Sun doc */
- biggestHeap.setCollectionUsageThreshold((long)(biggestSize * collectionMemoryThresholdFraction));
+ tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize * collectionMemoryThresholdFraction));
// we set a higher threshold for usage threshold exceeded notification
// since this is more likely to be effective sooner and we do not
// want to be spilling too soon
- biggestHeap.setUsageThreshold((long)(biggestSize * memoryThresholdFraction));
+ tenuredHeap.setUsageThreshold((long)(tenuredHeapSize * memoryThresholdFraction));
}
-
+
public static SpillableMemoryManager getInstance() {
if (manager == null) {
manager = new SpillableMemoryManager();
@@ -138,21 +136,21 @@ public class SpillableMemoryManager impl
}
public static void configure(Properties properties) {
-
+
try {
-
+
spillFileSizeThreshold = Long.parseLong(
properties.getProperty("pig.spill.size.threshold") ) ;
-
+
gcActivationSize = Long.parseLong(
properties.getProperty("pig.spill.gc.activation.size") ) ;
- }
+ }
catch (NumberFormatException nfe) {
throw new RuntimeException("Error while converting system configurations" +
"spill.size.threshold, spill.gc.activation.size", nfe) ;
}
}
-
+
@Override
public void handleNotification(Notification n, Object o) {
CompositeData cd = (CompositeData) n.getUserData();
@@ -166,7 +164,7 @@ public class SpillableMemoryManager impl
toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5);
//log
- String msg = "memory handler call- Usage threshold "
+ String msg = "memory handler call- Usage threshold "
+ info.getUsage();
if(!firstUsageThreshExceededLogged){
log.info("first " + msg);
@@ -177,7 +175,7 @@ public class SpillableMemoryManager impl
} else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
long threshold = (long)(info.getUsage().getMax() * collectionMemoryThresholdFraction);
toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5);
-
+
//log
String msg = "memory handler call - Collection threshold "
+ info.getUsage();
@@ -191,7 +189,7 @@ public class SpillableMemoryManager impl
}
clearSpillables();
if (toFree < 0) {
- log.debug("low memory handler returning " +
+ log.debug("low memory handler returning " +
"because there is nothing to free");
return;
}
@@ -203,7 +201,7 @@ public class SpillableMemoryManager impl
* becomes null, but it will be close enough.
* Also between the time we sort and we use these spillables, they
* may actually change in size - so this is just best effort
- */
+ */
@Override
public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
Spillable o1 = o1Ref.get();
@@ -219,7 +217,7 @@ public class SpillableMemoryManager impl
}
long o1Size = o1.getMemorySize();
long o2Size = o2.getMemorySize();
-
+
if (o1Size == o2Size) {
return 0;
}
@@ -254,8 +252,10 @@ public class SpillableMemoryManager impl
// we force GC to make sure we really need to keep this
// object before paying for the expensive spill().
// Done at most once per handleNotification.
+ // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
+ // extraGCSpillSizeThreshold and the data is always strong referenced.
if( !extraGCCalled && extraGCSpillSizeThreshold != 0
- && toBeFreed > extraGCSpillSizeThreshold ) {
+ && toBeFreed > extraGCSpillSizeThreshold && !(s instanceof GroupingSpillable)) {
log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
// this extra assignment to null is needed so that gc can free the
// spillable if nothing else is pointing at it
@@ -271,7 +271,7 @@ public class SpillableMemoryManager impl
continue;
}
}
- s.spill();
+ s.spill();
numObjSpilled++;
estimatedFreed += toBeFreed;
accumulatedFreeSize += toBeFreed;
@@ -280,13 +280,13 @@ public class SpillableMemoryManager impl
if (accumulatedFreeSize > gcActivationSize) {
invokeGC = true;
}
-
+
if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
invokeGC = true;
break;
}
- }
+ }
/* Poke the GC again to see if we successfully freed enough memory */
if(invokeGC) {
System.gc();
@@ -301,7 +301,7 @@ public class SpillableMemoryManager impl
}
}
-
+
public void clearSpillables() {
synchronized (spillables) {
// Walk the list first and remove nulls, otherwise the sort
Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
public class UDFContext {
@@ -41,6 +43,10 @@ public class UDFContext {
}
};
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class);
+ }
+
private UDFContext() {
udfConfs = new HashMap<UDFContextKey, Properties>();
}
@@ -62,7 +68,8 @@ public class UDFContext {
/*
* internal pig use only - should NOT be called from user code
*/
- public static void destroy() {
+ @StaticDataCleanup
+ public static void cleanupStaticData() {
tss = new ThreadLocal<UDFContext>() {
@Override
public UDFContext initialValue() {
Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Thu Nov 27 12:49:54 2014
@@ -23,10 +23,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.io.SequenceInputStream;
-import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -42,12 +40,10 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobConf;
@@ -65,7 +61,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
@@ -76,6 +71,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
+import org.joda.time.DateTimeZone;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
@@ -85,17 +81,32 @@ import com.google.common.primitives.Long
*/
public class Utils {
private static final Log log = LogFactory.getLog(Utils.class);
-
+ private static final Pattern JAVA_MAXHEAPSIZE_PATTERN = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
+
+
/**
* This method checks whether JVM vendor is IBM
* @return true if IBM JVM is being used
* false otherwise
*/
- public static boolean isVendorIBM() {
+ public static boolean isVendorIBM() {
return System.getProperty("java.vendor").contains("IBM");
}
-
-
+
+ public static boolean isHadoop23() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b0\\.23\\..+\\b"))
+ return true;
+ return false;
+ }
+
+ public static boolean isHadoop2() {
+ String version = org.apache.hadoop.util.VersionInfo.getVersion();
+ if (version.matches("\\b2\\.\\d+\\..+"))
+ return true;
+ return false;
+ }
+
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are
@@ -238,7 +249,7 @@ public class Utils {
}
public static LogicalSchema parseSchema(String schemaString) throws ParserException {
- QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
+ QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
"util", new HashMap<String, String>() ) ;
LogicalSchema schema = queryParser.parseSchema(schemaString);
return schema;
@@ -249,7 +260,7 @@ public class Utils {
* field. This will be called only when PigStorage is invoked with
* '-tagFile' or '-tagPath' option and the schema file is present to be
* loaded.
- *
+ *
* @param schema
* @param fieldName
* @return ResourceSchema
@@ -383,7 +394,7 @@ public class Utils {
} else if (TEMPFILE_STORAGE.TFILE.lowerName().equals(tmpFileCompressionStorage)) {
return TEMPFILE_STORAGE.TFILE;
} else {
- throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
+ throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
". Should be one of " + Arrays.toString(TEMPFILE_STORAGE.values()));
}
}
@@ -582,7 +593,7 @@ public class Utils {
// substitute
eval = eval.substring(0, match.start())+val+eval.substring(match.end());
}
- throw new IllegalStateException("Variable substitution depth too large: "
+ throw new IllegalStateException("Variable substitution depth too large: "
+ MAX_SUBST + " " + expr);
}
@@ -648,4 +659,35 @@ public class Utils {
return null;
}
+
+ public static int extractHeapSizeInMB(String input) {
+ int ret = 0;
+ if(input == null || input.equals(""))
+ return ret;
+ Matcher m = JAVA_MAXHEAPSIZE_PATTERN.matcher(input);
+ String heapStr = null;
+ String heapNum = null;
+ // Grabs the last match which takes effect (in case that multiple Xmx options specified)
+ while (m.find()) {
+ heapStr = m.group(1);
+ heapNum = m.group(2);
+ }
+ if (heapStr != null) {
+ // when Xmx specified in Gigabyte
+ if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+ ret = Integer.parseInt(heapNum) * 1024;
+ } else {
+ ret = Integer.parseInt(heapNum);
+ }
+ }
+ return ret;
+ }
+
+ public static void setDefaultTimeZone(Configuration conf) {
+ String dtzStr = conf.get(PigConfiguration.PIG_DATETIME_DEFAULT_TIMEZONE);
+ if (dtzStr != null && dtzStr.length() > 0) {
+ // don't use offsets because it breaks across DST/Standard Time
+ DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
+ }
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java Thu Nov 27 12:49:54 2014
@@ -27,7 +27,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
@@ -57,6 +60,7 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTime;
public class OrcUtils {
@@ -92,7 +96,14 @@ public class OrcUtils {
for (Map.Entry<Object, Object> entry : m.entrySet()) {
Object convertedKey = convertOrcToPig(entry.getKey(), keyObjectInspector, null);
Object convertedValue = convertOrcToPig(entry.getValue(), valueObjectInspector, null);
- ((Map)result).put(convertedKey.toString(), convertedValue);
+ if (convertedKey!=null) {
+ ((Map)result).put(convertedKey.toString(), convertedValue);
+ } else {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ if (reporter != null) {
+ reporter.incrCounter(PigWarning.UDF_WARNING_1, 1);
+ }
+ }
}
break;
case LIST:
@@ -125,6 +136,12 @@ public class OrcUtils {
case STRING:
result = poi.getPrimitiveJavaObject(obj);
break;
+ case CHAR:
+ result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue();
+ break;
+ case VARCHAR:
+ result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue();
+ break;
case BYTE:
result = (int)(Byte)poi.getPrimitiveJavaObject(obj);
break;
@@ -222,6 +239,12 @@ public class OrcUtils {
case STRING:
fieldSchema.setType(DataType.CHARARRAY);
break;
+ case VARCHAR:
+ fieldSchema.setType(DataType.CHARARRAY);
+ break;
+ case CHAR:
+ fieldSchema.setType(DataType.CHARARRAY);
+ break;
case TIMESTAMP:
fieldSchema.setType(DataType.DATETIME);
break;
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014
@@ -512,7 +512,7 @@ public class ExpToPhyTranslationVisitor
}
List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
if (cacheFiles != null) {
- ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
+ ((POUserFunc)p).setCacheFiles(cacheFiles);
}
} else {
p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -142,6 +143,8 @@ public class LogToPhyTranslationVisitor
load.setSignature(loLoad.getSignature());
load.setLimit(loLoad.getLimit());
load.setIsTmpLoad(loLoad.isTmpLoad());
+ load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles());
+ load.setShipFiles(loLoad.getLoadFunc().getShipFiles());
currentPlan.add(load);
logToPhyMap.put(loLoad, load);
@@ -631,6 +634,7 @@ public class LogToPhyTranslationVisitor
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
+ fe.setMapSideOnly(true);
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
currentPlan.connect(logToPhyMap.get(op), fe);
@@ -953,8 +957,11 @@ public class LogToPhyTranslationVisitor
store.setSortInfo(loStore.getSortInfo());
store.setIsTmpStore(loStore.isTmpStore());
store.setStoreFunc(loStore.getStoreFunc());
-
store.setSchema(Util.translateSchema( loStore.getSchema() ));
+ if (loStore.getStoreFunc() instanceof StoreResources) {
+ store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles());
+ store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles());
+ }
currentPlan.add(store);