You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/08/09 02:48:16 UTC
svn commit: r1371022 - in /pig/trunk: ./ src/org/apache/pig/data/
test/org/apache/pig/data/
Author: jcoveney
Date: Thu Aug 9 00:48:16 2012
New Revision: 1371022
URL: http://svn.apache.org/viewvc?rev=1371022&view=rev
Log:
[PIG-2862] Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/data/AbstractTuple.java
pig/trunk/src/org/apache/pig/data/DefaultTuple.java
pig/trunk/src/org/apache/pig/data/SchemaTuple.java
pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 9 00:48:16 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2862: Hardcode certain tuple lengths into the TUPLE BinInterSedes byte identifier (jcoveney)
+
PIG-2855: Provide a method to measure time spent in UDFs (dvryaboy)
PIG-2837: AvroStorage throws StackOverFlowError (cheolsoo via sms)
Modified: pig/trunk/src/org/apache/pig/data/AbstractTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/AbstractTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/AbstractTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/AbstractTuple.java Thu Aug 9 00:48:16 2012
@@ -19,8 +19,8 @@ package org.apache.pig.data;
import java.util.Iterator;
-import org.apache.pig.impl.util.TupleFormat;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.util.TupleFormat;
import com.google.common.base.Joiner;
@@ -67,4 +67,9 @@ public abstract class AbstractTuple impl
public boolean equals(Object other) {
return (compareTo(other) == 0);
}
+
+ @Override
+ public void reference(Tuple t) {
+ throw new RuntimeException("Tuple#reference(Tuple) is deprecated and should not be used");
+ }
}
Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Thu Aug 9 00:48:16 2012
@@ -93,18 +93,6 @@ public class DefaultTuple extends Abstra
}
/**
- * Make this tuple reference the contents of another. This method does not copy the underlying data. It maintains
- * references to the data from the original tuple (and possibly even to the data structure holding the data).
- *
- * @param t
- * Tuple to reference.
- */
- @Override
- public void reference(Tuple t) {
- mFields = t.getAll();
- }
-
- /**
* Find the size of the tuple. Used to be called arity().
*
* @return number of fields in the tuple.
Modified: pig/trunk/src/org/apache/pig/data/SchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTuple.java Thu Aug 9 00:48:16 2012
@@ -153,6 +153,10 @@ public abstract class SchemaTuple<T exte
bis.writeDatum(out, v, DataType.BAG);
}
+ protected static void write(DataOutput out, Map<String, Object> v) throws IOException {
+ bis.writeDatum(out, v, DataType.MAP);
+ }
+
protected static void write(DataOutput out, int v) throws IOException {
SedesHelper.Varint.writeSignedVarInt(v, out);
}
@@ -185,6 +189,11 @@ public abstract class SchemaTuple<T exte
return (DataBag) bis.readDatum(in, DataType.BAG);
}
+ @SuppressWarnings("unchecked")
+ protected static Map<String, Object> read(DataInput in, Map<String, Object> v) throws IOException {
+ return (Map<String, Object>) bis.readDatum(in, DataType.MAP);
+ }
+
protected static int read(DataInput in, int v) throws IOException {
return SedesHelper.Varint.readSignedVarInt(in);
}
@@ -336,6 +345,10 @@ public abstract class SchemaTuple<T exte
return unbox((DataBag) t);
}
+ protected Map<String, Object> unbox(Object v, Map<String, Object> t) {
+ return unbox((Map<String, Object>) t);
+ }
+
protected byte[] unbox(Object v, byte[] t) {
return unbox((DataByteArray)v);
}
@@ -372,6 +385,10 @@ public abstract class SchemaTuple<T exte
return v;
}
+ protected Map<String, Object> unbox(Map<String, Object> v) {
+ return v;
+ }
+
protected byte[] unbox(DataByteArray v) {
if (v == null) {
return null;
@@ -403,6 +420,10 @@ public abstract class SchemaTuple<T exte
return v;
}
+ protected Map<String, Object> box(Map<String, Object> v) {
+ return v;
+ }
+
protected DataByteArray box(byte[] v) {
if (v == null) {
return null;
@@ -475,6 +496,10 @@ public abstract class SchemaTuple<T exte
return isNull ? hash : 31 * hash + v.hashCode();
}
+ protected int hashCodePiece(int hash, Map<String, Object> v, boolean isNull) {
+ return isNull ? hash : 31 * hash + v.hashCode();
+ }
+
@Override
public int hashCode() {
return generatedCodeHashCode();
@@ -581,6 +606,13 @@ public abstract class SchemaTuple<T exte
protected abstract void generatedCodeSetDataBag(int fieldNum, DataBag val) throws ExecException;
+ @Override
+ public void setMap(int fieldNum, Map<String, Object> val) throws ExecException {
+ generatedCodeSetMap(fieldNum, val);
+ }
+
+ protected abstract void generatedCodeSetMap(int fieldNum, Map<String, Object> val) throws ExecException;
+
private void errorIfNull(boolean isNull, String type) throws FieldIsNullException {
if (isNull) {
throw new FieldIsNullException("Desired field of type ["+type+"] was null!");
@@ -632,6 +664,11 @@ public abstract class SchemaTuple<T exte
return val;
}
+ protected Map<String, Object> returnUnlessNull(boolean isNull, Map<String, Object> val) throws FieldIsNullException {
+ errorIfNull(isNull, "Map<String,Object>");
+ return val;
+ }
+
@Override
public int getInt(int fieldNum) throws ExecException {
return generatedCodeGetInt(fieldNum);
@@ -732,6 +769,18 @@ public abstract class SchemaTuple<T exte
return (DataBag)val;
}
+ @Override
+ public Map<String, Object> getMap(int fieldNum) throws ExecException {
+ return generatedCodeGetMap(fieldNum);
+ }
+
+ protected abstract Map<String, Object> generatedCodeGetMap(int fieldNum) throws ExecException;
+
+ @SuppressWarnings("unchecked")
+ protected Map<String, Object> unboxMap(Object val) {
+ return (Map<String, Object>)val;
+ }
+
protected static Schema staticSchemaGen(String s) {
try {
if (s.equals("")) {
@@ -1032,6 +1081,33 @@ public abstract class SchemaTuple<T exte
return compare(isNull, val, themNull, themVal);
}
+ protected int compare(boolean usNull, Map<String, Object> usVal, boolean themNull, Map<String, Object> themVal) {
+ if (usNull && themNull) {
+ return 0;
+ } else if (themNull) {
+ return 1;
+ } else if (usNull) {
+ return -1;
+ }
+ return compare(usVal, themVal);
+ }
+
+ protected int compare(Map<String, Object> val, Map<String, Object> themVal) {
+ return DataType.compare(val, themVal, DataType.MAP, DataType.MAP);
+ }
+
+ protected int compareWithElementAtPos(boolean isNull, Map<String, Object> val, SchemaTuple<?> t, int pos) {
+ Map<String, Object> themVal;
+ boolean themNull;
+ try {
+ themVal = t.getMap(pos);
+ themNull = t.isNull(pos);
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to retrieve DataBag field " + pos + " in given Tuple: " + t, e);
+ }
+ return compare(isNull, val, themNull, themVal);
+ }
+
protected int compareWithElementAtPos(boolean isNull, SchemaTuple<?> val, SchemaTuple<?> t, int pos) {
Object themVal;
boolean themNull;
@@ -1067,16 +1143,6 @@ public abstract class SchemaTuple<T exte
public abstract A make();
}
- @NotImplemented
- public Map<String,Object> getMap(int idx) throws ExecException {
- throw MethodHelper.methodNotImplemented();
- }
-
- @NotImplemented
- public void setMap(int idx, Map<String,Object> val) throws ExecException {
- throw MethodHelper.methodNotImplemented();
- }
-
public int size() {
return generatedCodeSize();
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Thu Aug 9 00:48:16 2012
@@ -723,17 +723,23 @@ public class SchemaTupleClassGenerator {
} else if (isLong() || isDouble()) {
size += 8;
} else if (isBytearray()) {
- s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + ";
- } else if (isString()) {
- s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + ";
+ size += 8; //the ptr
+ s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + ";
} else if (isBoolean()) {
if (booleans++ % 8 == 0) {
size++; //accounts for the byte used to store boolean values
}
} else if (isBag()) {
- //TODO IMPLEMENT
- } else {
+ size += 8; //the ptr
+ s += "(pos_"+fieldPos+" == null ? 0 : pos_"+fieldPos+".getMemorySize()) + ";
+ } else if (isMap() || isString()) {
+ size += 8; //the ptr
+ s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + ";
+ } else if (isTuple()) {
+ size += 8; //the ptr
s += "(pos_"+fieldPos+" == null ? 8 : pos_"+fieldPos+".getMemorySize()) + ";
+ } else {
+ throw new RuntimeException("Unsupported type found: " + fs);
}
if (isPrimitive() && primitives++ % 8 == 0) {
@@ -762,6 +768,8 @@ public class SchemaTupleClassGenerator {
case (DataType.CHARARRAY): add(" return (String)null;"); break;
case (DataType.TUPLE): add(" return (Tuple)null;"); break;
case (DataType.BAG): add(" return (DataBag)null;"); break;
+ case (DataType.MAP): add(" return (Map<String,Object>)null;"); break;
+ default: throw new RuntimeException("Unsupported type");
}
add("}");
addBreak();
@@ -1026,6 +1034,7 @@ public class SchemaTupleClassGenerator {
listOfFutureMethods.add(new TypeAwareSetString(DataType.BOOLEAN));
listOfFutureMethods.add(new TypeAwareSetString(DataType.TUPLE));
listOfFutureMethods.add(new TypeAwareSetString(DataType.BAG));
+ listOfFutureMethods.add(new TypeAwareSetString(DataType.MAP));
listOfFutureMethods.add(new TypeAwareGetString(DataType.INTEGER));
listOfFutureMethods.add(new TypeAwareGetString(DataType.LONG));
listOfFutureMethods.add(new TypeAwareGetString(DataType.FLOAT));
@@ -1035,6 +1044,7 @@ public class SchemaTupleClassGenerator {
listOfFutureMethods.add(new TypeAwareGetString(DataType.BOOLEAN));
listOfFutureMethods.add(new TypeAwareGetString(DataType.TUPLE));
listOfFutureMethods.add(new TypeAwareGetString(DataType.BAG));
+ listOfFutureMethods.add(new TypeAwareGetString(DataType.MAP));
listOfFutureMethods.add(new ListSetString());
for (TypeInFunctionStringOut t : listOfFutureMethods) {
@@ -1051,6 +1061,7 @@ public class SchemaTupleClassGenerator {
StringBuilder head =
new StringBuilder()
.append("import java.util.List;\n")
+ .append("import java.util.Map;\n")
.append("import java.util.Iterator;\n")
.append("import java.io.DataOutput;\n")
.append("import java.io.DataInput;\n")
@@ -1156,10 +1167,6 @@ public class SchemaTupleClassGenerator {
public void prepareProcess(Schema.FieldSchema fs) {
type = fs.type;
- if (type==DataType.MAP) {
- throw new RuntimeException("Map currently not supported by SchemaTuple");
- }
-
process(fieldPos, fs);
fieldPos++;
}
@@ -1204,6 +1211,10 @@ public class SchemaTupleClassGenerator {
return type == DataType.BAG;
}
+ public boolean isMap() {
+ return type == DataType.MAP;
+ }
+
public boolean isObject() {
return !isPrimitive();
}
@@ -1223,6 +1234,7 @@ public class SchemaTupleClassGenerator {
case (DataType.BOOLEAN): return "boolean";
case (DataType.TUPLE): return "Tuple";
case (DataType.BAG): return "DataBag";
+ case (DataType.MAP): return "Map";
default: throw new RuntimeException("Can't return String for given type " + DataType.findTypeName(type));
}
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java Thu Aug 9 00:48:16 2012
@@ -57,10 +57,6 @@ public class SchemaTupleFactory implemen
}
for (Schema.FieldSchema fs : s.getFields()) {
- if (fs.type == DataType.MAP) {
- return false;
- }
-
if (fs.type == DataType.TUPLE && !isGeneratable(fs.schema)) {
return false;
}
Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1371022&r1=1371021&r2=1371022&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Thu Aug 9 00:48:16 2012
@@ -37,6 +37,7 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -62,6 +63,8 @@ import org.apache.pig.impl.util.Utils;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Maps;
+
public class TestSchemaTuple {
private Properties props;
private Configuration conf;
@@ -171,6 +174,10 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+ isAppendable = false;
+ udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
// this compiles and "ships"
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
@@ -288,6 +295,11 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
+
+ isAppendable = false;
+ udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
}
private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
@@ -399,6 +411,22 @@ public class TestSchemaTuple {
db.add(t);
}
return db;
+ case DataType.MAP:
+ Map<String, Object> map = Maps.newHashMap();
+ int keys = r.nextInt(10);
+ for (int i = 0; i < keys; i++) {
+ String key = (String) randData(new FieldSchema(null, DataType.CHARARRAY));
+ int values = r.nextInt(100);
+ for (int j = 0; j < values; j++) {
+ int tSz = r.nextInt(10);
+ Tuple t = TupleFactory.getInstance().newTuple(tSz);
+ for (int k = 0; k < tSz; k++) {
+ t.set(k, r.nextInt());
+ }
+ map.put(key, t);
+ }
+ }
+ return map;
default: throw new RuntimeException("Cannot generate data for given FieldSchema: " + fs);
}
}
@@ -424,6 +452,7 @@ public class TestSchemaTuple {
case DataType.DOUBLE: st.getDouble(i); break;
case DataType.TUPLE: st.getTuple(i); break;
case DataType.BAG: st.getDataBag(i); break;
+ case DataType.MAP: st.getMap(i); break;
default: throw new RuntimeException("Unsupported FieldSchema in SchemaTuple: " + fs);
}
} catch (FieldIsNullException e) {