You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/16 09:57:45 UTC
svn commit: r985819 [2/3] - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/data/ test/org/apache/pig/test/
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java Mon Aug 16 07:57:44 2010
@@ -17,13 +17,8 @@
*/
package org.apache.pig.data;
-import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
-import java.lang.StringBuilder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.MappedByteBuffer;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -191,33 +186,40 @@ public class DataByteArray implements Co
}
/**
- * Compare two byte arrays. Comparison is done first using byte values
- * then length. So "g" will be greater than "abcdefg", but "hello worlds"
- * is greater than "hello world". If the other object is not a
- * DataByteArray, {@link DataType#compare} will be called.
+ * Compare two byte arrays. Comparison is done first using byte values then
+ * length. So "g" will be greater than "abcdefg", but "hello worlds" is
+ * greater than "hello world". If the other object is not a DataByteArray,
+ * {@link DataType#compare} will be called.
+ *
* @param other Other object to compare to.
* @return -1 if less than, 1 if greater than, 0 if equal.
*/
public int compareTo(Object other) {
if (other instanceof DataByteArray) {
- DataByteArray dba = (DataByteArray)other;
- int mySz = mData.length;
- int tSz = dba.mData.length;
- int i;
- for (i = 0; i < mySz; i++) {
- // If the other has run out of characters, we're bigger.
- if (i >= tSz) return 1;
- if (mData[i] < dba.mData[i]) return -1;
- else if (mData[i] > dba.mData[i]) return 1;
- }
- // If the other still has characters left, it's greater
- if (i < tSz) return -1;
- return 0;
+ DataByteArray dba = (DataByteArray) other;
+ return compare(mData, dba.mData);
} else {
return DataType.compare(this, other);
}
}
+ public static int compare(byte[] b1, byte[] b2) {
+ int i;
+ for (i = 0; i < b1.length; i++) {
+ // If the other has run out of characters, we're bigger.
+ if (i >= b2.length)
+ return 1;
+ if (b1[i] < b2[i])
+ return -1;
+ else if (b1[i] > b2[i])
+ return 1;
+ }
+ // If the other still has characters left, it's greater
+ if (i < b2.length)
+ return -1;
+ return 0;
+ }
+
@Override
public boolean equals(Object other) {
return (compareTo(other) == 0);
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Mon Aug 16 07:57:44 2010
@@ -46,7 +46,7 @@ public class DataReaderWriter {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
private static BagFactory mBagFactory = BagFactory.getInstance();
static final int UNSIGNED_SHORT_MAX = 65535;
- static final String UTF8 = "UTF-8";
+ public static final String UTF8 = "UTF-8";
public static Tuple bytesToTuple(DataInput in) throws IOException {
// Don't use Tuple.readFields, because it requires you to
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Mon Aug 16 07:57:44 2010
@@ -17,86 +17,101 @@
*/
package org.apache.pig.data;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.StringBuilder;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.TupleFormat;
/**
- * A default implementation of Tuple. This class will be created by the
- * DefaultTupleFactory.
+ * A default implementation of Tuple. This class will be created by the DefaultTupleFactory.
*/
public class DefaultTuple implements Tuple {
-
+
protected boolean isNull = false;
private static final long serialVersionUID = 2L;
protected List<Object> mFields;
-
+
/**
- * Default constructor. This constructor is public so that hadoop can call
- * it directly. However, inside pig you should never be calling this
- * function. Use TupleFactory instead.
+ * Default constructor. This constructor is public so that hadoop can call it directly. However, inside pig you
+ * should never be calling this function. Use TupleFactory instead.
*/
public DefaultTuple() {
mFields = new ArrayList<Object>();
}
/**
- * Construct a tuple with a known number of fields. Package level so
- * that callers cannot directly invoke it.
- * @param size Number of fields to allocate in the tuple.
+ * Construct a tuple with a known number of fields. Package level so that callers cannot directly invoke it.
+ *
+ * @param size
+ * Number of fields to allocate in the tuple.
*/
DefaultTuple(int size) {
mFields = new ArrayList<Object>(size);
- for (int i = 0; i < size; i++) mFields.add(null);
+ for (int i = 0; i < size; i++)
+ mFields.add(null);
}
/**
- * Construct a tuple from an existing list of objects. Package
- * level so that callers cannot directly invoke it.
- * @param c List of objects to turn into a tuple.
+ * Construct a tuple from an existing list of objects. Package level so that callers cannot directly invoke it.
+ *
+ * @param c
+ * List of objects to turn into a tuple.
*/
DefaultTuple(List<Object> c) {
mFields = new ArrayList<Object>(c.size());
Iterator<Object> i = c.iterator();
int field;
- for (field = 0; i.hasNext(); field++) mFields.add(field, i.next());
+ for (field = 0; i.hasNext(); field++)
+ mFields.add(field, i.next());
}
/**
- * Construct a tuple from an existing list of objects. Package
- * level so that callers cannot directly invoke it.
- * @param c List of objects to turn into a tuple. This list will be kept
- * as part of the tuple.
- * @param junk Just used to differentiate from the constructor above that
- * copies the list.
+ * Construct a tuple from an existing list of objects. Package level so that callers cannot directly invoke it.
+ *
+ * @param c
+ * List of objects to turn into a tuple. This list will be kept as part of the tuple.
+ * @param junk
+ * Just used to differentiate from the constructor above that copies the list.
*/
DefaultTuple(List<Object> c, int junk) {
mFields = c;
}
-
/**
- * Make this tuple reference the contents of another. This method does not copy
- * the underlying data. It maintains references to the data from the original
- * tuple (and possibly even to the data structure holding the data).
- * @param t Tuple to reference.
+ * Make this tuple reference the contents of another. This method does not copy the underlying data. It maintains
+ * references to the data from the original tuple (and possibly even to the data structure holding the data).
+ *
+ * @param t
+ * Tuple to reference.
*/
public void reference(Tuple t) {
mFields = t.getAll();
}
/**
- * Find the size of the tuple. Used to be called arity().
+ * Find the size of the tuple. Used to be called arity().
+ *
* @return number of fields in the tuple.
*/
public int size() {
@@ -105,10 +120,12 @@ public class DefaultTuple implements Tup
/**
* Find out if a given field is null.
- * @param fieldNum Number of field to check for null.
+ *
+ * @param fieldNum
+ * Number of field to check for null.
* @return true if the field is null, false otherwise.
- * @throws ExecException if the field number given is greater
- * than or equal to the number of fields in the tuple.
+ * @throws ExecException
+ * if the field number given is greater than or equal to the number of fields in the tuple.
*/
public boolean isNull(int fieldNum) throws ExecException {
return (mFields.get(fieldNum) == null);
@@ -116,12 +133,13 @@ public class DefaultTuple implements Tup
/**
* Find the type of a given field.
- * @param fieldNum Number of field to get the type for.
- * @return type, encoded as a byte value. The values are taken from
- * the class DataType. If the field is null, then DataType.UNKNOWN
- * will be returned.
- * @throws ExecException if the field number is greater than or equal to
- * the number of fields in the tuple.
+ *
+ * @param fieldNum
+ * Number of field to get the type for.
+ * @return type, encoded as a byte value. The values are taken from the class DataType. If the field is null, then
+ * DataType.UNKNOWN will be returned.
+ * @throws ExecException
+ * if the field number is greater than or equal to the number of fields in the tuple.
*/
public byte getType(int fieldNum) throws ExecException {
return DataType.findType(mFields.get(fieldNum));
@@ -129,10 +147,12 @@ public class DefaultTuple implements Tup
/**
* Get the value in a given field.
- * @param fieldNum Number of the field to get the value for.
+ *
+ * @param fieldNum
+ * Number of the field to get the value for.
* @return value, as an Object.
- * @throws ExecException if the field number is greater than or equal to
- * the number of fields in the tuple.
+ * @throws ExecException
+ * if the field number is greater than or equal to the number of fields in the tuple.
*/
public Object get(int fieldNum) throws ExecException {
return mFields.get(fieldNum);
@@ -140,8 +160,8 @@ public class DefaultTuple implements Tup
/**
* Get all of the fields in the tuple as a list.
- * @return List<Object> containing the fields of the tuple
- * in order.
+ *
+ * @return List<Object> containing the fields of the tuple in order.
*/
public List<Object> getAll() {
return mFields;
@@ -149,49 +169,50 @@ public class DefaultTuple implements Tup
/**
* Set the value in a given field.
- * @param fieldNum Number of the field to set the value for.
- * @param val Object to put in the indicated field.
- * @throws ExecException if the field number is greater than or equal to
- * the number of fields in the tuple.
+ *
+ * @param fieldNum
+ * Number of the field to set the value for.
+ * @param val
+ * Object to put in the indicated field.
+ * @throws ExecException
+ * if the field number is greater than or equal to the number of fields in the tuple.
*/
public void set(int fieldNum, Object val) throws ExecException {
mFields.set(fieldNum, val);
}
/**
- * Append a field to a tuple. This method is not efficient as it may
- * force copying of existing data in order to grow the data structure.
- * Whenever possible you should construct your Tuple with the
- * newTuple(int) method and then fill in the values with set(), rather
- * than construct it with newTuple() and append values.
- * @param val Object to append to the tuple.
+ * Append a field to a tuple. This method is not efficient as it may force copying of existing data in order to grow
+ * the data structure. Whenever possible you should construct your Tuple with the newTuple(int) method and then fill
+ * in the values with set(), rather than construct it with newTuple() and append values.
+ *
+ * @param val
+ * Object to append to the tuple.
*/
public void append(Object val) {
mFields.add(val);
}
/**
- * Determine the size of tuple in memory. This is used by data bags
- * to determine their memory size. This need not be exact, but it
- * should be a decent estimation.
+ * Determine the size of tuple in memory. This is used by data bags to determine their memory size. This need not be
+ * exact, but it should be a decent estimation.
+ *
* @return estimated memory size.
*/
public long getMemorySize() {
Iterator<Object> i = mFields.iterator();
- //fixed overhead
- long empty_tuple_size = 8 /* tuple object header*/
+ // fixed overhead
+ long empty_tuple_size = 8 /* tuple object header */
+ 8 /* isNull - but rounded to 8 bytes as total obj size needs to be multiple of 8 */
- + 8 /* mFields reference*/
- + 32 /* mFields array list fixed size*/;
+ + 8 /* mFields reference */
+ + 32 /* mFields array list fixed size */;
-
- //rest of the fixed portion of mfields size is accounted within empty_tuple_size
- long mfields_var_size = roundToEight(4 + 4*mFields.size());
+ // rest of the fixed portion of mfields size is accounted within empty_tuple_size
+ long mfields_var_size = roundToEight(4 + 4 * mFields.size());
// in java hotspot 32bit vm, there seems to be a minimum tuple size of 96
// which is probably from the minimum size of this array list
- mfields_var_size = Math.max(40, mfields_var_size);
-
-
+ mfields_var_size = Math.max(40, mfields_var_size);
+
long sum = empty_tuple_size + mfields_var_size;
while (i.hasNext()) {
sum += getFieldMemorySize(i.next());
@@ -199,23 +220,24 @@ public class DefaultTuple implements Tup
return sum;
}
-
-
/**
* Memory size of objects are rounded to multiple of 8 bytes
+ *
* @param i
- * @return i rounded to a equal of higher multiple of 8
+ * @return i rounded to a equal of higher multiple of 8
*/
private long roundToEight(long i) {
- return 8 * ((i+7)/8); // integer division rounds the result down
+ return 8 * ((i + 7) / 8); // integer division rounds the result down
}
- /**
- * Write a tuple of atomic values into a string. All values in the
- * tuple must be atomic (no bags, tuples, or maps).
- * @param delim Delimiter to use in the string.
+ /**
+ * Write a tuple of atomic values into a string. All values in the tuple must be atomic (no bags, tuples, or maps).
+ *
+ * @param delim
+ * Delimiter to use in the string.
* @return A string containing the tuple.
- * @throws ExecException if a non-atomic value is found.
+ * @throws ExecException
+ * if a non-atomic value is found.
*/
public String toDelimitedString(String delim) throws ExecException {
StringBuilder buf = new StringBuilder();
@@ -228,15 +250,14 @@ public class DefaultTuple implements Tup
return buf.toString();
}
-
@Override
public String toString() {
- return TupleFormat.format(this);
+ return TupleFormat.format(this);
}
public int compareTo(Object other) {
if (other instanceof Tuple) {
- Tuple t = (Tuple)other;
+ Tuple t = (Tuple) other;
int mySz = mFields.size();
int tSz = t.size();
if (tSz < mySz) {
@@ -261,6 +282,240 @@ public class DefaultTuple implements Tup
}
}
+ public static class DefaultTupleRawComparator extends WritableComparator implements TupleRawComparator {
+ private final Log mLog = LogFactory.getLog(getClass());
+ private boolean[] mAsc;
+ private boolean mWholeTuple;
+ private boolean mHasNullField;
+ private TupleFactory mFact;
+
+ public DefaultTupleRawComparator() {
+ super(DefaultTuple.class);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (!(conf instanceof JobConf)) {
+ mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
+ return;
+ }
+ JobConf jconf = (JobConf) conf;
+ try {
+ mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ } catch (IOException ioe) {
+ mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
+ throw new RuntimeException(ioe);
+ }
+ if (mAsc == null) {
+ mAsc = new boolean[1];
+ mAsc[0] = true;
+ }
+ // If there's only one entry in mAsc, it means it's for the whole
+ // tuple. So we can't be looking for each column.
+ mWholeTuple = (mAsc.length == 1);
+ mFact = TupleFactory.getInstance();
+ }
+
+ @Override
+ public boolean hasComparedTupleNull() {
+ return mHasNullField;
+ }
+
+ /**
+ * Compare two DefaultTuples as raw bytes. We assume the Tuples are NOT PigNullableWritable, so client classes
+ * need to deal with Null and Index.
+ */
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+ ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+ int rc = compareDefaultTuple(bb1, bb2, true); // FIXME adjust for secondary sort asc
+ return rc;
+ }
+
+ /**
+ * Compare two DefaultTuples as raw bytes.
+ */
+ private int compareDefaultTuple(ByteBuffer bb1, ByteBuffer bb2, boolean topLevel) {
+ mHasNullField = false;
+ // store the position in case of deserialization
+ int s1 = bb1.position();
+ int s2 = bb2.position();
+ int rc = 0;
+ byte tupleType1 = bb1.get();
+ byte tupleType2 = bb2.get();
+ assert (tupleType1 == tupleType2 && tupleType1 == DataType.TUPLE);
+ // first compare sizes
+ int sz1 = bb1.getInt();
+ int sz2 = bb2.getInt();
+ if (sz1 > sz2) {
+ return 1;
+ } else if (sz1 < sz2) {
+ return -1;
+ } else {
+ // if sizes are the same, compare field by field
+ for (int i = 0; i < sz1 && rc == 0; i++) {
+ byte dt1 = bb1.get();
+ byte dt2 = bb2.get();
+ if (dt1 == dt2) {
+ switch (dt1) {
+ case DataType.NULL:
+ if (topLevel) // we are scanning the top-level Tuple (original call)
+ mHasNullField = true;
+ rc = 0;
+ break;
+ case DataType.BOOLEAN:
+ case DataType.BYTE:
+ byte bv1 = bb1.get();
+ byte bv2 = bb2.get();
+ rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
+ break;
+ case DataType.INTEGER:
+ int iv1 = bb1.getInt();
+ int iv2 = bb2.getInt();
+ rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
+ break;
+ case DataType.LONG:
+ long lv1 = bb1.getLong();
+ long lv2 = bb2.getLong();
+ rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
+ break;
+ case DataType.FLOAT:
+ float fv1 = bb1.getFloat();
+ float fv2 = bb2.getFloat();
+ rc = Float.compare(fv1, fv2);
+ break;
+ case DataType.DOUBLE:
+ double dv1 = bb1.getDouble();
+ double dv2 = bb2.getDouble();
+ rc = Double.compare(dv1, dv2);
+ break;
+ case DataType.BYTEARRAY:
+ int basz1 = bb1.getInt();
+ int basz2 = bb2.getInt();
+ byte[] ba1 = new byte[basz1];
+ byte[] ba2 = new byte[basz2];
+ bb1.get(ba1);
+ bb2.get(ba2);
+ rc = DataByteArray.compare(ba1, ba2);
+ break;
+ case DataType.CHARARRAY:
+ case DataType.BIGCHARARRAY:
+ int casz1 = (dt1 == DataType.CHARARRAY) ? bb1.getShort() : bb1.getInt();
+ int casz2 = (dt1 == DataType.CHARARRAY) ? bb2.getShort() : bb2.getInt();
+ byte[] ca1 = new byte[casz1];
+ byte[] ca2 = new byte[casz2];
+ bb1.get(ca1);
+ bb2.get(ca2);
+ String str1 = null,
+ str2 = null;
+ try {
+ str1 = new String(ca1, DataReaderWriter.UTF8);
+ str2 = new String(ca2, DataReaderWriter.UTF8);
+ } catch (UnsupportedEncodingException uee) {
+ mLog.warn("Unsupported string encoding", uee);
+ uee.printStackTrace();
+ }
+ if (str1 != null && str2 != null)
+ rc = str1.compareTo(str2);
+ break;
+ case DataType.TUPLE:
+ // put back the cursor to before DataType.TUPLE
+ bb1.position(bb1.position() - 1);
+ bb2.position(bb2.position() - 1);
+ rc = compareDefaultTuple(bb1, bb2, false);
+ break;
+ default:
+ mLog.info("Unsupported DataType for binary comparison, switching to object deserialization: "
+ + DataType.genTypeToNameMap().get(dt1) + "(" + dt1 + ")");
+ Tuple t1 = mFact.newTuple();
+ Tuple t2 = mFact.newTuple();
+ try {
+ t1.readFields(new DataInputStream(
+ new ByteArrayInputStream(bb1.array(), s1, bb1.limit())));
+ t2.readFields(new DataInputStream(
+ new ByteArrayInputStream(bb2.array(), s2, bb2.limit())));
+ } catch (IOException ioe) {
+ mLog.error("Unable to instantiate tuples for comparison: " + ioe.getMessage());
+ throw new RuntimeException(ioe.getMessage(), ioe);
+ }
+ // delegate to compareTuple
+ return compareTuple(t1, t2);
+ }
+ } else { // compare DataTypes
+ if (dt1 < dt2)
+ rc = -1;
+ else
+ rc = 1;
+ }
+ // flip if the order is descending
+ if (rc != 0) {
+ if (!mWholeTuple && !mAsc[i])
+ rc *= -1;
+ else if (mWholeTuple && !mAsc[0])
+ rc *= -1;
+ }
+ }
+ }
+ return rc;
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ NullableTuple nt1 = (NullableTuple) o1;
+ NullableTuple nt2 = (NullableTuple) o2;
+ int rc = 0;
+
+ // if either are null, handle differently
+ if (!nt1.isNull() && !nt2.isNull()) {
+ rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType());
+ } else {
+ // for sorting purposes two nulls are equal
+ if (nt1.isNull() && nt2.isNull())
+ rc = 0;
+ else if (nt1.isNull())
+ rc = -1;
+ else
+ rc = 1;
+ if (mWholeTuple && !mAsc[0])
+ rc *= -1;
+ }
+ return rc;
+ }
+
+ private int compareTuple(Tuple t1, Tuple t2) {
+ int sz1 = t1.size();
+ int sz2 = t2.size();
+ if (sz2 < sz1) {
+ return 1;
+ } else if (sz2 > sz1) {
+ return -1;
+ } else {
+ for (int i = 0; i < sz1; i++) {
+ try {
+ int c = DataType.compare(t1.get(i), t2.get(i));
+ if (c != 0) {
+ if (!mWholeTuple && !mAsc[i])
+ c *= -1;
+ else if (mWholeTuple && !mAsc[0])
+ c *= -1;
+ return c;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
+ return 0;
+ }
+ }
+
+ }
+
@Override
public boolean equals(Object other) {
return (compareTo(other) == 0);
@@ -290,13 +545,12 @@ public class DefaultTuple implements Tup
public void readFields(DataInput in) throws IOException {
// Clear our fields, in case we're being reused.
mFields.clear();
-
+
// Make sure it's a tuple.
byte b = in.readByte();
if (b != DataType.TUPLE) {
int errCode = 2112;
- String msg = "Unexpected data while reading tuple " +
- "from binary file.";
+ String msg = "Unexpected data while reading tuple " + "from binary file.";
throw new ExecException(msg, errCode, PigException.BUG);
}
// Read the number of fields
@@ -315,71 +569,70 @@ public class DefaultTuple implements Tup
// 12 is added to each to account for the object overhead and the
// pointer in the tuple.
switch (DataType.findType(o)) {
- case DataType.BYTEARRAY: {
- byte[] bytes = ((DataByteArray)o).get();
- // bytearray size including rounding to 8 bytes
- long byte_array_sz = roundToEight(bytes.length + 12);
-
- return byte_array_sz + 16 /*16 is additional size of DataByteArray */;
- }
+ case DataType.BYTEARRAY: {
+ byte[] bytes = ((DataByteArray) o).get();
+ // bytearray size including rounding to 8 bytes
+ long byte_array_sz = roundToEight(bytes.length + 12);
- case DataType.CHARARRAY: {
- String s = (String)o;
- // See PIG-1443 for a reference for this formula
- return roundToEight((s.length() * 2) + 38);
- }
+ return byte_array_sz + 16 /* 16 is additional size of DataByteArray */;
+ }
- case DataType.TUPLE: {
- Tuple t = (Tuple)o;
- return t.getMemorySize();
- }
+ case DataType.CHARARRAY: {
+ String s = (String) o;
+ // See PIG-1443 for a reference for this formula
+ return roundToEight((s.length() * 2) + 38);
+ }
- case DataType.BAG: {
- DataBag b = (DataBag)o;
- return b.getMemorySize();
- }
+ case DataType.TUPLE: {
+ Tuple t = (Tuple) o;
+ return t.getMemorySize();
+ }
- case DataType.INTEGER:
- return 4 + 8 + 4/*+4 to round to 8 bytes*/;
+ case DataType.BAG: {
+ DataBag b = (DataBag) o;
+ return b.getMemorySize();
+ }
- case DataType.LONG:
- return 8 + 8;
+ case DataType.INTEGER:
+ return 4 + 8 + 4/* +4 to round to 8 bytes */;
- case DataType.MAP: {
- Map<String, Object> m = (Map<String, Object>)o;
- Iterator<Map.Entry<String, Object> > i =
- m.entrySet().iterator();
- long sum = 0;
- while (i.hasNext()) {
- Map.Entry<String, Object> entry = i.next();
- sum += getFieldMemorySize(entry.getKey());
- sum += getFieldMemorySize(entry.getValue());
- }
- //based on experiments on 32 bit Java HotSpot VM
- // size of map with 0 entries is 120 bytes
- // each additional entry have around 24 bytes overhead at
- // small number of entries. At larger number of entries, the
- // overhead is around 32 bytes, probably because of the expanded
- // data structures in anticapation of more entries being added
- return sum + m.size()*32 + 120;
- }
+ case DataType.LONG:
+ return 8 + 8;
- case DataType.FLOAT:
- return 4 + 8 + 4/*+4 to round to 8 bytes*/;
+ case DataType.MAP: {
+ Map<String, Object> m = (Map<String, Object>) o;
+ Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
+ long sum = 0;
+ while (i.hasNext()) {
+ Map.Entry<String, Object> entry = i.next();
+ sum += getFieldMemorySize(entry.getKey());
+ sum += getFieldMemorySize(entry.getValue());
+ }
+ // based on experiments on 32 bit Java HotSpot VM
+ // size of map with 0 entries is 120 bytes
+ // each additional entry have around 24 bytes overhead at
+ // small number of entries. At larger number of entries, the
+ // overhead is around 32 bytes, probably because of the expanded
+ // data structures in anticapation of more entries being added
+ return sum + m.size() * 32 + 120;
+ }
- case DataType.DOUBLE:
- return 8 + 8;
+ case DataType.FLOAT:
+ return 4 + 8 + 4/* +4 to round to 8 bytes */;
- case DataType.BOOLEAN:
- //boolean takes 1 byte , +7 to round it to 8
- return 1 + 8 + 7;
-
- case DataType.NULL:
- return 0;
+ case DataType.DOUBLE:
+ return 8 + 8;
- default:
- // ??
- return 12;
+ case DataType.BOOLEAN:
+ // boolean takes 1 byte , +7 to round it to 8
+ return 1 + 8 + 7;
+
+ case DataType.NULL:
+ return 0;
+
+ default:
+ // ??
+ return 12;
}
}
@@ -391,11 +644,14 @@ public class DefaultTuple implements Tup
}
/**
- * @param isNull boolean indicating whether this tuple is null
+ * @param isNull
+ * boolean indicating whether this tuple is null
*/
public void setNull(boolean isNull) {
this.isNull = isNull;
}
-
+
+ public static Class<? extends TupleRawComparator> getComparatorClass() {
+ return DefaultTupleRawComparator.class;
+ }
}
-
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java Mon Aug 16 07:57:44 2010
@@ -33,5 +33,8 @@ package org.apache.pig.data;
*/
@Deprecated
public class DefaultTupleFactory extends BinSedesTupleFactory {
-
+ @Override
+ public Class<? extends TupleRawComparator> tupleRawComparatorClass() {
+ return DefaultTuple.getComparatorClass();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java Mon Aug 16 07:57:44 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DefaultTuple.DefaultTupleRawComparator;
/**
* A class to handle reading and writing of intermediate results of data
@@ -76,5 +77,7 @@ public interface InterSedes {
*/
public void writeDatum(DataOutput out, Object val)
throws IOException;
+
+ public Class<? extends TupleRawComparator> getTupleRawComparatorClass();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Aug 16 07:57:44 2010
@@ -23,6 +23,8 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -124,13 +126,12 @@ public abstract class TupleFactory {
/**
* Return the actual class representing a tuple that the implementing
- * factory will be returning. This is needed because hadoop needs
+ * factory will be returning. This is needed because Hadoop needs
* to know the exact class we will be using for input and output.
* @return Class that implements tuple.
*/
- public abstract Class tupleClass();
-
-
+ public abstract Class<? extends Tuple> tupleClass();
+
protected TupleFactory() {
}
@@ -141,6 +142,18 @@ public abstract class TupleFactory {
public static void resetSelf() {
gSelf = null;
}
+
+ /**
+ * Return the actual class implementing the raw comparator for tuples
+ * that the factory will be returning. Ovverride this to allow Hadoop to
+ * speed up tuple sorting. The actual returned class should know the
+ * serialization details for the tuple. The default implementation
+ * (PigTupleDefaultRawComparator) will serialize the data before comparison
+ * @return Class that implements tuple raw comparator.
+ */
+ public Class<? extends TupleRawComparator> tupleRawComparatorClass() {
+ return PigTupleDefaultRawComparator.class;
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java?rev=985819&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java Mon Aug 16 07:57:44 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * This interface is intended to compare Tuples. The semantics of Tuple comparison must take into account null values in
+ * different ways. According to SQL semantics nulls are not equal. But for other Pig/Latin statements nulls must be
+ * grouped together. This interface allows to check if there are null fields in the tuples compared using this
+ * comparator. This method is meaningful only when the tuples are determined to be equal by the
+ * {@link #compare(byte[],int,int,byte[],int,int)} method.
+ *
+ */
+@SuppressWarnings("rawtypes")
+public interface TupleRawComparator extends RawComparator, Configurable {
+ /**
+ * Checks if one of the compared tuples had a null field. This method is meaningful only when
+ * {@link #compare(byte[],int,int,byte[],int,int)} has returned a zero value (i.e. tuples are determined to be
+ * equal).
+ *
+ * @return true if one of the compared tuples had a null field, false otherwise.
+ */
+ public boolean hasComparedTupleNull();
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java?rev=985819&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java Mon Aug 16 07:57:44 2010
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPigTupleRawComparator {
+
+ private TupleFactory tf = TupleFactory.getInstance();
+ private PigTupleSortComparator comparator = new PigTupleSortComparator();
+ private PigTupleDefaultRawComparator oldComparator = new PigTupleDefaultRawComparator();
+ private List<Object> list;
+ private NullableTuple prototype;
+ private ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
+ private ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+ private DataOutputStream dos1 = new DataOutputStream(baos1);
+ private DataOutputStream dos2 = new DataOutputStream(baos2);
+ private final static int TUPLE_NUMBER = (int) 1e3;
+ private final static int TIMES = (int) 1e5;
+ private final static int SEED = 123456789;
+
+ @Before
+ public void setUp() {
+ JobConf jobConf = new JobConf();
+ comparator.setConf(jobConf);
+ oldComparator.setConf(jobConf);
+ list = Arrays.<Object> asList(1f, 2, 3.0, 4l, (byte) 5, true,
+ new DataByteArray(new byte[] { 0x10, 0x2a, 0x5e }), "hello world!",
+ tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 11)));
+ prototype = new NullableTuple(tf.newTuple(list));
+ baos1.reset();
+ baos2.reset();
+ }
+
+ @Test
+ public void testCompareEquals() throws IOException {
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res == 0);
+ }
+
+ @Test
+ public void testCompareFloat() throws IOException {
+ list.set(0, (Float) list.get(0) - 1);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ }
+
+ @Test
+ public void testCompareInt() throws IOException {
+ list.set(1, (Integer) list.get(1) + 1);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testCompareDouble() throws IOException {
+ list.set(2, (Double) list.get(2) + 0.1);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testCompareByte() throws IOException {
+ list.set(4, (Byte) list.get(4) + 1);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testCompareBoolean() throws IOException {
+ list.set(5, false);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ }
+
+ @Test
+ public void testCompareByteArray() throws IOException {
+ list.set(6, new DataByteArray(new byte[] { 0x10, 0x1a }));
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ list.set(6, new DataByteArray(new byte[] { 0x20 }));
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ }
+
+ @Test
+ public void testCompareCharArray() throws IOException {
+ list.set(7, "hello world!");
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res == 0);
+ list.set(7, "hello worlc!");
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ list.set(7, "hello worlz!");
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ list.set(7, "hello");
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ list.set(7, "hello world!?");
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void compareInnerTuples() throws IOException {
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res == 0);
+ list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 12)));
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 9l, 12)));
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ list.set(8, tf.newTuple(Arrays.<Object> asList(7.0, 9f, 9l, 12)));
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ // DataType.LONG < DataType.DOUBLE
+ list.set(8, tf.newTuple(Arrays.<Object> asList(8l, 9f, 9l, 12)));
+ t = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ // object after tuple
+ list = new ArrayList<Object>(list);
+ list.add(10);
+ NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+ list.set(list.size() - 1, 11);
+ NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ // fancy tuple nesting
+ list.set(list.size() - 1, tf.newTuple(list));
+ t1 = new NullableTuple(tf.newTuple(list));
+ list.set(list.size() - 1, 10);
+ list.set(list.size() - 1, tf.newTuple(list));
+ t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ }
+
+ @Test
+ public void testCompareDataBag() throws IOException {
+ list = new ArrayList<Object>(list);
+ list.add(new DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(0)))));
+ NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+ list.set(list.size() - 1, new DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(1)))));
+ NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testCompareMap() throws IOException {
+ list = new ArrayList<Object>(list);
+ list.add(Collections.singletonMap("pig", "scalability"));
+ NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+ list.set(list.size() - 1, Collections.singletonMap("pig", "scalability"));
+ NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res == 0);
+ list.set(list.size() - 1, Collections.singletonMap("pigg", "scalability"));
+ t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ list.set(list.size() - 1, Collections.singletonMap("pig", "Scalability"));
+ t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ list.set(list.size() - 1, Collections.singletonMap("pii", "scalability"));
+ t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ // object after map
+ list.add(107);
+ t1 = new NullableTuple(tf.newTuple(list));
+ list.set(list.size() - 1, 108);
+ t2 = new NullableTuple(tf.newTuple(list));
+ res = compareHelper(t1, t2, comparator);
+ assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testCompareDiffertTypes() throws IOException {
+ // DataType.INTEGER < DataType.LONG
+ list.set(3, 4);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ }
+
+ @Test
+ public void testCompareDifferentSizes() throws IOException {
+ list = new ArrayList<Object>(list);
+ // this object should be never get into the comparison loop
+ list.add(new DefaultDataBag());
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res < 0);
+ }
+
+ @Test
+ public void testRandomTuples() throws IOException {
+ Random rand = new Random(SEED);
+ for (int i = 0; i < TUPLE_NUMBER; i++) {
+ NullableTuple t = new NullableTuple(getRandomTuple(rand));
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ }
+ }
+
+ @Test
+ public void testSortOrder() throws IOException {
+ // prototype < t but we use inverse sort order
+ list.set(2, (Double) list.get(2) + 0.1);
+ NullableTuple t = new NullableTuple(tf.newTuple(list));
+ JobConf jobConf = new JobConf();
+ jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {false}));
+ comparator.setConf(jobConf);
+ int res = compareHelper(prototype, t, comparator);
+ assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {true,true,false,true,true,true,true,true,true}));
+ comparator.setConf(jobConf);
+ res = compareHelper(prototype, t, comparator);
+ assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+ assertTrue(res > 0);
+ }
+
+ private Tuple getRandomTuple(Random rand) throws IOException {
+ int pos = rand.nextInt(list.size());
+ Tuple t = tf.newTuple(list);
+ switch (pos) {
+ case 0:
+ t.set(pos, rand.nextFloat());
+ break;
+ case 1:
+ t.set(pos, rand.nextInt());
+ break;
+ case 2:
+ t.set(pos, rand.nextDouble());
+ break;
+ case 3:
+ t.set(pos, rand.nextLong());
+ break;
+ case 4:
+ t.set(pos, (byte) rand.nextInt());
+ break;
+ case 5:
+ t.set(pos, rand.nextBoolean());
+ break;
+ case 6:
+ byte[] ba = new byte[3];
+ rand.nextBytes(ba);
+ t.set(pos, new DataByteArray(ba));
+ break;
+ case 7:
+ int length = rand.nextInt(15);
+ String s = randomString(length, rand);
+ t.set(pos, s);
+ break;
+ case 8:
+ length = rand.nextInt(6);
+ t.set(pos, getRandomTuple(rand));
+ default:
+ }
+ return t;
+ }
+
+ private int compareHelper(NullableTuple t1, NullableTuple t2, RawComparator comparator) throws IOException {
+ t1.write(dos1);
+ t2.write(dos2);
+ byte[] b1 = baos1.toByteArray();
+ byte[] b2 = baos2.toByteArray();
+ baos1.reset();
+ baos2.reset();
+ return comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
+ }
+
+ private static final String AB = "0123456789abcdefghijklmnopqrstuwxyz!?-_ ";
+
+ private String randomString(int length, Random rand) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++)
+ sb.append(AB.charAt(rand.nextInt(AB.length())));
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ long before, after;
+ Random rand = new Random(SEED);
+ TestPigTupleRawComparator test = new TestPigTupleRawComparator();
+ test.setUp();
+ byte[][] toCompare1 = new byte[TUPLE_NUMBER][];
+ byte[][] toCompare2 = new byte[TUPLE_NUMBER][];
+ NullableTuple t;
+ for (int i = 0; i < TUPLE_NUMBER; i++) {
+ t = new NullableTuple(test.getRandomTuple(rand));
+ t.write(test.dos1);
+ toCompare1[i] = test.baos1.toByteArray();
+ }
+ for (int i = 0; i < TUPLE_NUMBER; i++) {
+ t = new NullableTuple(test.getRandomTuple(rand));
+ t.write(test.dos2);
+ toCompare2[i] = test.baos2.toByteArray();
+ }
+
+ before = System.currentTimeMillis();
+ for (int loop = 0; loop < TIMES; loop++) {
+ for (int i = 0; i < TUPLE_NUMBER; i++) {
+ test.comparator.compare(toCompare1[i], 0, toCompare1[i].length, toCompare2[i], 0, toCompare2[i].length);
+ }
+ }
+ after = System.currentTimeMillis();
+ System.out.println("Raw Version - elapsed time: " + Long.toString(after - before) + " milliseconds");
+
+ before = System.currentTimeMillis();
+ for (int loop = 0; loop < TIMES; loop++) {
+ for (int i = 0; i < TUPLE_NUMBER; i++) {
+ test.oldComparator.compare(toCompare1[i], 0, toCompare1[i].length, toCompare2[i], 0,
+ toCompare2[i].length);
+ }
+ }
+ after = System.currentTimeMillis();
+ System.out.println("Old Version - elapsed time: " + Long.toString(after - before) + " milliseconds");
+ }
+}