You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/07/03 22:36:16 UTC
svn commit: r1356921 [4/4] - in /pig/trunk: ./ conf/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expr...
Added: pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java Tue Jul 3 20:36:09 2012
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+@InterfaceAudience.Private
+public class SedesHelper {
+ private static final BinInterSedes pigSerializer = new BinInterSedes();
+ private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ public static void writeBytes(DataOutput out, byte[] buf) throws IOException {
+ int sz = buf.length;
+ if (sz < BinInterSedes.UNSIGNED_BYTE_MAX) {
+ out.writeByte(BinInterSedes.TINYBYTEARRAY);
+ out.writeByte(sz);
+ } else if (sz < BinInterSedes.UNSIGNED_SHORT_MAX) {
+ out.writeByte(BinInterSedes.SMALLBYTEARRAY);
+ out.writeShort(sz);
+ } else {
+ out.writeByte(BinInterSedes.BYTEARRAY);
+ out.writeInt(sz);
+ }
+ out.write(buf);
+ }
+
+ public static byte[] readBytes(DataInput in, byte type) throws IOException {
+ int sz = 0;
+ switch(type) {
+ case(BinInterSedes.TINYBYTEARRAY): sz = in.readUnsignedByte(); break;
+ case(BinInterSedes.SMALLBYTEARRAY): sz = in.readUnsignedShort(); break;
+ case(BinInterSedes.BYTEARRAY): sz = in.readInt(); break;
+ }
+ byte[] buf = new byte[sz];
+ in.readFully(buf);
+ return buf;
+ }
+
+ public static void writeChararray(DataOutput out, String s) throws IOException {
+ // a char can take up to 3 bytes in the modified utf8 encoding
+ // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
+ if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) {
+ out.writeByte(BinInterSedes.SMALLCHARARRAY);
+ out.writeUTF(s);
+ } else {
+ byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
+ int length = utfBytes.length;
+
+ out.writeByte(BinInterSedes.CHARARRAY);
+ out.writeInt(length);
+ out.write(utfBytes);
+ }
+ }
+
+ public static String readChararray(DataInput in, byte type) throws IOException {
+ if (type == BinInterSedes.SMALLCHARARRAY) {
+ return in.readUTF();
+ }
+
+ int size = in.readInt();
+ byte[] buf = new byte[size];
+ in.readFully(buf);
+ return new String(buf, BinInterSedes.UTF8);
+ }
+
+ public static void writeGenericTuple(DataOutput out, Tuple t) throws IOException {
+ int sz = t.size();
+ if (sz < BinInterSedes.UNSIGNED_BYTE_MAX) {
+ out.writeByte(BinInterSedes.TINYTUPLE);
+ out.writeByte(sz);
+ } else if (sz < BinInterSedes.UNSIGNED_SHORT_MAX) {
+ out.writeByte(BinInterSedes.SMALLTUPLE);
+ out.writeShort(sz);
+ } else {
+ out.writeByte(BinInterSedes.TUPLE);
+ out.writeInt(sz);
+ }
+
+ for (int i = 0; i < sz; i++) {
+ pigSerializer.writeDatum(out, t.get(i));
+ }
+ }
+
+ public static Tuple readGenericTuple(DataInput in, byte type) throws IOException {
+ int sz = pigSerializer.getTupleSize(in, type);
+
+ Tuple t = mTupleFactory.newTuple(sz);
+ for (int i = 0; i < sz; i++) {
+ t.set(i, pigSerializer.readDatum(in));
+ }
+ return t;
+ }
+
+ public static void writeBooleanArray(DataOutput out, boolean[] v, boolean extra) throws IOException {
+ int len = v.length + 1;
+ for (int chunk = 0; chunk < len; chunk += 8) {
+ byte encoding = 0;
+ for (int i = chunk; i < len && i < chunk + 8; i++) {
+ encoding <<= 1;
+ if (i == v.length) {
+ encoding += extra ? 1 : 0; //v[len] is the extra piece
+ } else {
+ encoding += v[i] ? 1 : 0;
+ }
+ }
+ out.writeByte(encoding);
+ }
+ }
+
+ public static void writeBooleanArray(DataOutput out, boolean[] v) throws IOException {
+ for (int chunk = 0; chunk < v.length; chunk += 8) {
+ byte encoding = 0;
+ for (int i = chunk; i < v.length && i < chunk + 8; i++) {
+ encoding <<= 1;
+ encoding += v[i] ? 1 : 0;
+ }
+ out.writeByte(encoding);
+ }
+ }
+
+ public static boolean[] readBooleanArray(DataInput in, int size) throws IOException {
+ boolean[] v = new boolean[size];
+ for (int chunk = 0; chunk < size; chunk += 8) {
+ byte decoding = in.readByte();
+ for (int i = chunk + Math.min(7, size - chunk - 1); i >= 0; i--) {
+ v[i] = decoding % 2 == 1;
+ decoding >>= 1;
+ }
+ }
+ return v;
+ }
+
+ /**
+ * <p>Encodes signed and unsigned values using a common variable-length
+ * scheme, found for example in
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
+ * but will use slightly more bytes to encode large values.</p>
+ *
+ * <p>Signed values are further encoded using so-called zig-zag encoding
+ * in order to make them "compatible" with variable-length encoding.</p>
+ *
+ * <p>This is taken from mahout-core, and is included to avoid having to pull
+ * in the entirety of Mahout.</p>
+ */
+ public static class Varint {
+
+ private Varint() {
+ }
+
+ /**
+ * Encodes a value using the variable-length encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google Protocol Buffers</a>. It uses zig-zag encoding to efficiently
+ * encode signed values. If values are known to be nonnegative,
+ * {@link #writeUnsignedVarLong(long, DataOutput)} should be used.
+ *
+ * @param value value to encode
+ * @param out to write bytes to
+ * @throws IOException if {@link DataOutput} throws {@link IOException}
+ */
+ public static void writeSignedVarLong(long value, DataOutput out) throws IOException {
+ // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+ writeUnsignedVarLong((value << 1) ^ (value >> 63), out);
+ }
+
+ /**
+ * Encodes a value using the variable-length encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be negative.
+ * If values can be negative, use {@link #writeSignedVarLong(long, DataOutput)}
+ * instead. This method treats negative input as like a large unsigned value.
+ *
+ * @param value value to encode
+ * @param out to write bytes to
+ * @throws IOException if {@link DataOutput} throws {@link IOException}
+ */
+ public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException {
+ while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+ out.writeByte(((int) value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ out.writeByte((int) value & 0x7F);
+ }
+
+ /**
+ * @see #writeSignedVarLong(long, DataOutput)
+ */
+ public static void writeSignedVarInt(int value, DataOutput out) throws IOException {
+ // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+ writeUnsignedVarInt((value << 1) ^ (value >> 31), out);
+ }
+
+ /**
+ * @see #writeUnsignedVarLong(long, DataOutput)
+ */
+ public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException {
+ while ((value & 0xFFFFFF80) != 0L) {
+ out.writeByte((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ out.writeByte(value & 0x7F);
+ }
+
+ /**
+ * @param in to read bytes from
+ * @return decode value
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ * @throws IllegalArgumentException if variable-length value does not terminate
+ * after 9 bytes have been read
+ * @see #writeSignedVarLong(long, DataOutput)
+ */
+ public static long readSignedVarLong(DataInput in) throws IOException {
+ long raw = readUnsignedVarLong(in);
+ // This undoes the trick in writeSignedVarLong()
+ long temp = (((raw << 63) >> 63) ^ raw) >> 1;
+ // This extra step lets us deal with the largest signed values by treating
+ // negative results from read unsigned methods as like unsigned values
+ // Must re-flip the top bit if the original read value had it set.
+ return temp ^ (raw & (1L << 63));
+ }
+
+ /**
+ * @param in to read bytes from
+ * @return decode value
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ * @throws IllegalArgumentException if variable-length value does not terminate
+ * after 9 bytes have been read
+ * @see #writeUnsignedVarLong(long, DataOutput)
+ */
+ public static long readUnsignedVarLong(DataInput in) throws IOException {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = in.readByte()) & 0x80L) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ if (i > 63) {
+ throw new RuntimeException("Variable length quantity is too long");
+ }
+ }
+ return value | (b << i);
+ }
+
+ /**
+ * @throws IllegalArgumentException if variable-length value does not terminate
+ * after 5 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ * @see #readSignedVarLong(DataInput)
+ */
+ public static int readSignedVarInt(DataInput in) throws IOException {
+ int raw = readUnsignedVarInt(in);
+ // This undoes the trick in writeSignedVarInt()
+ int temp = (((raw << 31) >> 31) ^ raw) >> 1;
+ // This extra step lets us deal with the largest signed values by treating
+ // negative results from read unsigned methods as like unsigned values.
+ // Must re-flip the top bit if the original read value had it set.
+ return temp ^ (raw & (1 << 31));
+ }
+
+ /**
+ * @throws IllegalArgumentException if variable-length value does not terminate
+ * after 5 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ * @see #readUnsignedVarLong(DataInput)
+ */
+ public static int readUnsignedVarInt(DataInput in) throws IOException {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = in.readByte()) & 0x80) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ if (i > 35) {
+ throw new RuntimeException("Variable length quantity is too long");
+ }
+ }
+ return value | (b << i);
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java Tue Jul 3 20:36:09 2012
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+public class StructuresHelper {
+ private StructuresHelper() {
+ }
+
+ /**
+ * This encapsulates a Schema and allows it to be used in such a way that
+ * any aliases are ignored in equality.
+ */
+ public static class SchemaKey {
+ private Schema s;
+
+ public SchemaKey(Schema s) {
+ this.s = s;
+ }
+
+ private static int[] primeList = { 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37,
+ 41, 43, 47, 53, 59, 61, 67, 71, 73, 79,
+ 83, 89, 97, 101, 103, 107, 109, 1133};
+
+ /**
+ * The hashcode logic is taken from the Schema class, including how fields
+ * are handled. The difference is that aliases are ignored.
+ */
+ @Override
+ public int hashCode() {
+ return hashCode(s);
+ }
+
+ public static int hashCode(Schema s) {
+ if (s == null) {
+ return 0;
+ }
+ int idx = 0 ;
+ int hashCode = 0 ;
+ for(FieldSchema fs : s.getFields()) {
+ hashCode += hashCode(fs) * (primeList[idx % primeList.length]) ;
+ idx++ ;
+ }
+ return hashCode ;
+ }
+
+ private static int hashCode(FieldSchema fs) {
+ return (fs.type * 17) + ( (fs.schema == null? 0 : hashCode(fs.schema)) * 23 );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SchemaKey)) {
+ return false;
+ }
+ Schema other = ((SchemaKey)o).get();
+ return (s == null && other == null) || Schema.equals(s, other, false, true);
+ }
+
+ public Schema get() {
+ return s;
+ }
+
+ public String toString() {
+ return s.toString();
+ }
+ }
+
+ /**
+ * This is a helper class which makes it easy to have pairs of values,
+ * and to use them as keys and values in Maps.
+ */
+ public static class Pair<T1, T2> {
+ private final T1 t1;
+ private final T2 t2;
+
+ public Pair(T1 t1, T2 t2) {
+ this.t1 = t1;
+ this.t2 = t2;
+ }
+
+ public T1 getFirst() {
+ return t1;
+ }
+
+ public T2 getSecond() {
+ return t2;
+ }
+
+ public static <A,B> Pair<A,B> make(A t1, B t2) {
+ return new Pair<A,B>(t1, t2);
+ }
+
+ @Override
+ public int hashCode() {
+ return (t1 == null ? 0 : t1.hashCode()) + (t2 == null ? 0 : 31 * t2.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Pair<?,?>)) {
+ return false;
+ }
+ Pair<?,?> pr = (Pair<?,?>)o;
+ if (t1 == null) {
+ return pr.getFirst() == null;
+ }
+ if (!t1.equals(pr.getFirst())) {
+ return false;
+ }
+ if (t2 == null) {
+ return pr.getSecond() == null;
+ }
+ return t2.equals(pr.getSecond());
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(t1)
+ .append(",")
+ .append(t2)
+ .append("]")
+ .toString();
+ }
+ }
+
+ public static class Triple<T1, T2, T3> {
+ private final T1 t1;
+ private final T2 t2;
+ private final T3 t3;
+
+ public Triple(T1 t1, T2 t2, T3 t3) {
+ this.t1 = t1;
+ this.t2 = t2;
+ this.t3 = t3;
+ }
+
+ public T1 getFirst() {
+ return t1;
+ }
+
+ public T2 getSecond() {
+ return t2;
+ }
+
+ public T3 getThird() {
+ return t3;
+ }
+
+ public static <A,B,C> Triple<A,B,C> make(A t1, B t2, C t3) {
+ return new Triple<A,B,C>(t1, t2, t3);
+ }
+
+ @Override
+ public int hashCode() {
+ return (t1 == null ? 0 : t1.hashCode())
+ + (t2 == null ? 0 : 31 * t2.hashCode())
+ + (t3 == null ? 0 : 527 * t3.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Triple<?,?,?>)) {
+ return false;
+ }
+ Triple<?,?,?> tr = (Triple<?,?,?>)o;
+ if (t1 == null) {
+ return tr.getFirst() == null;
+ }
+ if (!t1.equals(tr.getFirst())) {
+ return false;
+ }
+ if (t2 == null) {
+ return tr.getSecond() == null;
+ }
+ if (!t2.equals(tr.getSecond())) {
+ return false;
+ }
+ if (t3 == null) {
+ return tr.getThird() ==null;
+ }
+ if (!t3.equals(tr.getThird())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(t1)
+ .append(",")
+ .append(t2)
+ .append(",")
+ .append(t3)
+ .append("]")
+ .toString();
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 3 20:36:09 2012
@@ -242,9 +242,8 @@ public class PigContext implements Seria
* calls: addScriptFile(path, new File(path)), ensuring that a given path is
* added to the jar at most once.
* @param path
- * @throws MalformedURLException
*/
- public void addScriptFile(String path) throws MalformedURLException {
+ public void addScriptFile(String path) {
if (path != null) {
aliasedScriptFiles.put(path.replaceFirst("^/", ""), new File(path));
}
Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Tue Jul 3 20:36:09 2012
@@ -100,9 +100,7 @@ public class InterRecordReader extends R
}
if(b == -1) return false;
b = in.read();
- if(b != BinInterSedes.TINYTUPLE &&
- b != BinInterSedes.SMALLTUPLE &&
- b != BinInterSedes.TUPLE &&
+ if(!BinInterSedes.isTupleByte((byte) b) &&
b != -1) {
continue;
}
Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Tue Jul 3 20:36:09 2012
@@ -17,6 +17,10 @@
*/
package org.apache.pig.impl.io;
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.pig.data.BinInterSedes;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -26,12 +30,9 @@ import org.apache.pig.data.TupleFactory;
public class NullableTuple extends PigNullableWritable {
private TupleFactory mFactory = null;
+ private static final BinInterSedes bis = new BinInterSedes();
public NullableTuple() {
- if (mFactory == null) {
- mFactory = TupleFactory.getInstance();
- }
- mValue = mFactory.newTuple();
}
/**
@@ -44,4 +45,15 @@ public class NullableTuple extends PigNu
public Object getValueAsPigType() {
return isNull() ? null : (Tuple)mValue;
}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ boolean nullness = in.readBoolean();
+ setNull(nullness);
+ if (!nullness) {
+ mValue = bis.readTuple(in);
+ }
+ setIndex(in.readByte());
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Jul 3 20:36:09 2012
@@ -69,7 +69,6 @@ import org.apache.pig.newplan.Dependency
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -537,6 +536,7 @@ public class ExpToPhyTranslationVisitor
Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator();
((POUserFunc)p).setReferencedOperator( logToPhyMap.get( refOp ) );
}
+
}
@Override
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Tue Jul 3 20:36:09 2012
@@ -25,9 +25,10 @@ import java.util.Properties;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
-import org.apache.pig.ResourceSchema;
import org.apache.pig.builtin.Nondeterministic;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFrontend;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -193,12 +194,18 @@ public class UserFuncExpression extends
ef.setUDFContextSignature(signature);
Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass());
- if(Util.translateSchema(inputSchema)!=null)
- props.put("pig.evalfunc.inputschema."+signature, Util.translateSchema(inputSchema));
+ Schema translatedInputSchema = Util.translateSchema(inputSchema);
+ if(translatedInputSchema != null) {
+ props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema);
+ }
// Store inputSchema into the UDF context
- ef.setInputSchema(Util.translateSchema(inputSchema));
+ ef.setInputSchema(translatedInputSchema);
+;
+ Schema udfSchema = ef.outputSchema(translatedInputSchema);
- Schema udfSchema = ef.outputSchema(Util.translateSchema(inputSchema));
+ //TODO appendability should come from a setting
+ SchemaTupleFrontend.registerToGenerateIfPossible(translatedInputSchema, false, GenContext.UDF);
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, false, GenContext.UDF);
if (udfSchema != null) {
Schema.FieldSchema fs;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Jul 3 20:36:09 2012
@@ -25,8 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Stack;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -57,6 +60,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFrontend;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -64,6 +69,8 @@ import org.apache.pig.impl.builtin.GFCro
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -84,7 +91,10 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.parser.SourceLocation;
+import com.google.common.collect.Lists;
+
public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
+ private static final Log LOG = LogFactory.getLog(LogToPhyTranslationVisitor.class);
public LogToPhyTranslationVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
@@ -190,7 +200,7 @@ public class LogToPhyTranslationVisitor
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), filter.getRequestedParallelisam());
+ .getNextNodeId(scope)), filter.getRequestedParallelism());
poFilter.addOriginalLocation(filter.getAlias(), filter.getLocation());
poFilter.setResultType(DataType.BAG);
currentPlan.add(poFilter);
@@ -258,14 +268,14 @@ public class LogToPhyTranslationVisitor
POSort poSort;
if (sort.getUserFunc() == null) {
poSort = new POSort(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+ .getNextNodeId(scope)), sort.getRequestedParallelism(), null,
sortPlans, sort.getAscendingCols(), null);
} else {
POUserComparisonFunc comparator = new POUserComparisonFunc(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), sort
- .getRequestedParallelisam(), null, sort.getUserFunc());
+ .getRequestedParallelism(), null, sort.getUserFunc());
poSort = new POSort(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+ .getNextNodeId(scope)), sort.getRequestedParallelism(), null,
sortPlans, sort.getAscendingCols(), comparator);
}
poSort.addOriginalLocation(sort.getAlias(), sort.getLocation());
@@ -300,7 +310,7 @@ public class LogToPhyTranslationVisitor
String scope = DEFAULT_SCOPE;
List<Operator> inputs = cross.getPlan().getPredecessors(cross);
if (cross.isNested()) {
- POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam());
+ POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
physOp.addOriginalLocation(physOp.getAlias(), physOp.getOriginalLocations());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
@@ -318,10 +328,10 @@ public class LogToPhyTranslationVisitor
} else {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cross
- .getRequestedParallelisam());
+ .getRequestedParallelism());
poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), cross.getRequestedParallelisam());
+ .getNextNodeId(scope)), cross.getRequestedParallelism());
poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(poGlobal);
currentPlan.add(poPackage);
@@ -334,12 +344,12 @@ public class LogToPhyTranslationVisitor
for (Operator op : inputs) {
PhysicalPlan fep1 = new PhysicalPlan();
- ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+ ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce1.setValue(inputs.size());
ce1.setResultType(DataType.INTEGER);
fep1.add(ce1);
- ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+ ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce2.setValue(count);
ce2.setResultType(DataType.INTEGER);
fep1.add(ce2);
@@ -349,7 +359,7 @@ public class LogToPhyTranslationVisitor
ce1.setValue(ce1val);
ce1.setResultType(DataType.TUPLE);*/
- POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
+ POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
gfc.setResultType(DataType.BAG);
fep1.addAsLeaf(gfc);
@@ -359,7 +369,7 @@ public class LogToPhyTranslationVisitor
fep1.connect(ce2, gfc);*/
PhysicalPlan fep2 = new PhysicalPlan();
- POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam());
+ POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
feproj.setResultType(DataType.TUPLE);
feproj.setStar(true);
@@ -367,19 +377,19 @@ public class LogToPhyTranslationVisitor
fep2.add(feproj);
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
- POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+ POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
currentPlan.connect(logToPhyMap.get(op), fe);
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cross
- .getRequestedParallelisam());
+ .getRequestedParallelism());
physOp.addOriginalLocation(cross.getAlias(), cross.getLocation());
List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
for(int i=0;i<inputs.size();i++){
PhysicalPlan lrp1 = new PhysicalPlan();
- POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+ POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
lrproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
lrproj1.setOverloaded(false);
lrproj1.setResultType(DataType.INTEGER);
@@ -420,7 +430,7 @@ public class LogToPhyTranslationVisitor
List<Boolean> flattenLst = new ArrayList<Boolean>();
for(int i=1;i<=count;i++){
PhysicalPlan fep1 = new PhysicalPlan();
- POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+ POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), i);
feproj1.addOriginalLocation(cross.getAlias(), cross.getLocation());
feproj1.setResultType(DataType.BAG);
feproj1.setOverloaded(false);
@@ -429,7 +439,7 @@ public class LogToPhyTranslationVisitor
flattenLst.add(true);
}
- POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+ POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
try{
@@ -596,7 +606,7 @@ public class LogToPhyTranslationVisitor
flattenList.add(fl);
}
POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList);
+ .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList);
poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
poFE.setResultType(DataType.BAG);
logToPhyMap.put(foreach, poFE);
@@ -821,7 +831,7 @@ public class LogToPhyTranslationVisitor
}
POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
- DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelisam());
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelism());
return poCogrp;
}
@@ -881,7 +891,7 @@ public class LogToPhyTranslationVisitor
boolean[] innerFlags = loj.getInnerFlags();
String alias = loj.getAlias();
SourceLocation location = loj.getLocation();
- int parallel = loj.getRequestedParallelisam();
+ int parallel = loj.getRequestedParallelism();
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
@@ -952,6 +962,27 @@ public class LogToPhyTranslationVisitor
logToPhyMap.put(loj, skj);
}
else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
+ List<Schema> inputSchemas = Lists.newArrayListWithCapacity(inputs.size());
+ List<Schema> keySchemas = Lists.newArrayListWithCapacity(inputs.size());
+
+ outer: for (int i = 0; i < inputs.size(); i++) {
+ Schema toGen = Schema.getPigSchema(new ResourceSchema(((LogicalRelationalOperator)inputs.get(i)).getSchema()));
+ // This registers the value piece
+ SchemaTupleFrontend.registerToGenerateIfPossible(toGen, false, GenContext.FR_JOIN);
+ inputSchemas.add(toGen);
+
+ Schema keyToGen = new Schema();
+ for (Byte byt : keyTypes.get(i)) {
+ // We cannot generate any nested code because that information is thrown away
+ if (byt == null || DataType.isComplex(byt.byteValue())) {
+ continue outer;
+ }
+ keyToGen.add(new FieldSchema(null, byt));
+ }
+
+ SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
+ keySchemas.add(keyToGen);
+ }
int fragment = 0;
POFRJoin pfrj;
@@ -990,8 +1021,17 @@ public class LogToPhyTranslationVisitor
}
}
- pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),parallel,
- inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple);
+ pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
+ parallel,
+ inp,
+ ppLists,
+ keyTypes,
+ null,
+ fragment,
+ isLeftOuter,
+ nullTuple,
+ inputSchemas,
+ keySchemas);
pfrj.addOriginalLocation(alias, location);
} catch (ExecException e1) {
int errCode = 2058;
@@ -1017,12 +1057,46 @@ public class LogToPhyTranslationVisitor
boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
if(usePOMergeJoin){
+ // We register the merge join schema information for code generation
+ LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+ Schema leftSchema = null;
+ if (logicalSchema != null) {
+ leftSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+ }
+ logicalSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+ Schema rightSchema = null;
+ if (logicalSchema != null) {
+ rightSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+ }
+ logicalSchema = loj.getSchema();
+ Schema mergedSchema = null;
+ if (logicalSchema != null) {
+ mergedSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
+ }
+
+ if (leftSchema != null) {
+ SchemaTupleFrontend.registerToGenerateIfPossible(leftSchema, false, GenContext.MERGE_JOIN);
+ }
+ if (rightSchema != null) {
+ SchemaTupleFrontend.registerToGenerateIfPossible(rightSchema, false, GenContext.MERGE_JOIN);
+ }
+ if (mergedSchema != null) {
+ SchemaTupleFrontend.registerToGenerateIfPossible(mergedSchema, false, GenContext.MERGE_JOIN);
+ }
+
// inner join on two sorted inputs. We have less restrictive
// implementation here in a form of POMergeJoin which doesn't
// require loaders to implement collectable interface.
try {
smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
- parallel,inp,joinPlans,keyTypes, loj.getJoinType());
+ parallel,
+ inp,
+ joinPlans,
+ keyTypes,
+ loj.getJoinType(),
+ leftSchema,
+ rightSchema,
+ mergedSchema);
}
catch (PlanException e) {
int errCode = 2042;
@@ -1083,11 +1157,11 @@ public class LogToPhyTranslationVisitor
boolean[] innerFlags, MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
- DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
poGlobal.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
poGlobal.setCustomPartitioner(customPartitioner);
POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
- .getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+ .getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
poPackage.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
currentPlan.add(poGlobal);
currentPlan.add(poPackage);
@@ -1107,7 +1181,7 @@ public class LogToPhyTranslationVisitor
Operator op = inputs.get(i);
List<LogicalExpressionPlan> plans = innerPlans.get(i);
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
- DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelisam());
+ DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), relationalOp.getRequestedParallelism());
physOp.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
List<PhysicalPlan> exprPlans = translateExpressionPlans(relationalOp, plans);
try {
@@ -1198,7 +1272,7 @@ public class LogToPhyTranslationVisitor
@Override
public void visit(LOUnion loUnion) throws FrontendException {
String scope = DEFAULT_SCOPE;
- POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam());
+ POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelism());
physOp.addOriginalLocation(loUnion.getAlias(), loUnion.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
@@ -1220,7 +1294,7 @@ public class LogToPhyTranslationVisitor
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
String scope = DEFAULT_SCOPE;
- PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelisam());
+ PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
@@ -1241,7 +1315,7 @@ public class LogToPhyTranslationVisitor
public void visit(LOLimit loLimit) throws FrontendException {
String scope = DEFAULT_SCOPE;
POLimit poLimit = new POLimit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
- loLimit.getRequestedParallelisam());
+ loLimit.getRequestedParallelism());
poLimit.setLimit(loLimit.getLimit());
poLimit.addOriginalLocation(loLimit.getAlias(), loLimit.getLocation());
poLimit.setResultType(DataType.BAG);
@@ -1279,7 +1353,7 @@ public class LogToPhyTranslationVisitor
public void visit(LOSplit loSplit) throws FrontendException {
String scope = DEFAULT_SCOPE;
POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), loSplit.getRequestedParallelisam());
+ .getNextNodeId(scope)), loSplit.getRequestedParallelism());
physOp.addOriginalLocation(loSplit.getAlias(), loSplit.getLocation());
FileSpec splStrFile;
try {
@@ -1332,7 +1406,7 @@ public class LogToPhyTranslationVisitor
String scope = DEFAULT_SCOPE;
// System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam());
+ .getNextNodeId(scope)), loSplitOutput.getRequestedParallelism());
poFilter.addOriginalLocation(loSplitOutput.getAlias(), loSplitOutput.getLocation());
poFilter.setResultType(DataType.BAG);
currentPlan.add(poFilter);
@@ -1404,34 +1478,6 @@ public class LogToPhyTranslationVisitor
}
- private boolean validateMergeJoin(LOJoin loj) throws FrontendException{
-
- List<Operator> preds = plan.getPredecessors(loj);
-
- int errCode = 1101;
- String errMsg = "Merge Join must have exactly two inputs.";
- if(preds.size() != 2)
- throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode);
-
- return mergeJoinValidator(preds,loj.getPlan());
- }
-
- private boolean mergeJoinValidator(List<Operator> preds,OperatorPlan lp) throws FrontendException {
-
- int errCode = 1103;
- String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : ";
- if(preds != null && !preds.isEmpty()){
- for(Operator lo : preds){
- if (!(lo instanceof LOFilter || lo instanceof LOLoad || lo instanceof LOForEach))
- throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
- // All is good at this level. Visit predecessors now.
- mergeJoinValidator(lp.getPredecessors(lo),lp);
- }
- }
- // We visited everything and all is good.
- return true;
- }
-
private void translateSoftLinks(Operator op) throws FrontendException {
List<Operator> preds = op.getPlan().getSoftLinkPredecessors(op);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java Tue Jul 3 20:36:09 2012
@@ -103,7 +103,7 @@ abstract public class LogicalRelationalO
* Get the requestedParallelism for this operator.
* @return requestedParallelsim
*/
- public int getRequestedParallelisam() {
+ public int getRequestedParallelism() {
return requestedParallelism;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java Tue Jul 3 20:36:09 2012
@@ -88,7 +88,7 @@ public class GroupByConstParallelSetter
Operator op = iter.next();
if (op instanceof LOCogroup) {
LOCogroup group = (LOCogroup)op;
- if(group.getRequestedParallelisam() > 1){
+ if(group.getRequestedParallelism() > 1){
log.warn("Resetting parallism to 1 for the group/cogroup " +
group.getAlias() +
" as the group by expressions returns a constant");
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Tue Jul 3 20:36:09 2012
@@ -198,7 +198,7 @@ public class MergeForEach extends Rule {
LogicalPlan newForEachInnerPlan = new LogicalPlan();
newForEach.setInnerPlan(newForEachInnerPlan);
newForEach.setAlias(foreach2.getAlias());
- newForEach.setRequestedParallelism(foreach1.getRequestedParallelisam());
+ newForEach.setRequestedParallelism(foreach1.getRequestedParallelism());
List<LogicalExpressionPlan> newExpList = new ArrayList<LogicalExpressionPlan>();
LOGenerate newGen = new LOGenerate(newForEachInnerPlan, newExpList, gen2.getFlattenFlags());
newGen.setUserDefinedSchema(gen2.getUserDefinedSchema());
Added: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (added)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Tue Jul 3 20:36:09 2012
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.InterRecordReader;
+import org.apache.pig.impl.io.InterRecordWriter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchemaTuple {
+ private Properties props;
+ private Configuration conf;
+ private PigContext pigContext;
+ private static final BinInterSedes bis = new BinInterSedes();
+
+ @Before
+ public void perTestInitialize() {
+ SchemaTupleFrontend.reset();
+ SchemaTupleBackend.reset();
+
+ props = new Properties();
+ props.setProperty(SchemaTupleBackend.SHOULD_GENERATE_KEY, "true");
+
+ conf = ConfigurationUtil.toConfiguration(props);
+ pigContext = new PigContext(ExecType.LOCAL, props);
+ }
+
+ @Test
+ public void testCompileAndResolve() throws Exception {
+ //frontend
+ Schema udfSchema = Utils.getSchemaFromString("a:int");
+ boolean isAppendable = false;
+ GenContext context = GenContext.UDF;
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("a:long");
+ isAppendable = true;
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
+ isAppendable = false;
+ context = GenContext.LOAD;
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+ +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+ +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double))");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ udfSchema = Utils.getSchemaFromString("int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ isAppendable = true;
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ isAppendable = false;
+ udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ // this compiles and "ships"
+ SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
+
+ //backend
+ SchemaTupleBackend.initialize(conf, ExecType.LOCAL);
+
+ udfSchema = Utils.getSchemaFromString("a:int");
+ isAppendable = false;
+ context = GenContext.UDF;
+ SchemaTupleFactory tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ context = GenContext.MERGE_JOIN;
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ assertNull(tf);
+
+ udfSchema = Utils.getSchemaFromString("a:long");
+ context = GenContext.UDF;
+ isAppendable = true;
+
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ isAppendable = false;
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ assertNull(tf);
+
+ udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
+ isAppendable = false;
+ context = GenContext.LOAD;
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ udfSchema = Utils.getSchemaFromString("(a:chararray)");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ assertNull(tf);
+
+ udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ udfSchema = Utils.getSchemaFromString("((a:int,b:int),(a:int,b:int),(a:int,b:int)),((a:int,b:int),(a:int,b:int),(a:int,b:int))");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ udfSchema = Utils.getSchemaFromString("a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+ +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double,"
+ +"(a:int, b:long, c:chararray, d:boolean, e:bytearray, f:float, g:double))");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ udfSchema = Utils.getSchemaFromString("boolean, boolean, boolean, boolean, boolean, boolean"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean,"
+ + "boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean, boolean");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ udfSchema = Utils.getSchemaFromString("int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))"
+ +"int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double,"
+ +"(int, long, chararray, boolean, bytearray, float, double))");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ isAppendable = true;
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+
+ isAppendable = false;
+ udfSchema = Utils.getSchemaFromString("int, b:bag{(int,int,int)}");
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
+ }
+
+ private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
+ assertNotNull(tf);
+ if (isAppendable) {
+ assertTrue(tf.newTuple() instanceof AppendableSchemaTuple);
+ }
+
+ testNotAppendable(tf, udfSchema);
+ if (isAppendable) {
+ testAppendable(tf, udfSchema);
+ }
+ }
+
+ private void testAppendable(SchemaTupleFactory tf, Schema udfSchema) {
+ SchemaTuple<?> st = tf.newTuple();
+
+ st.append("woah");
+ assertEquals(udfSchema.size() + 1, st.size());
+
+ }
+
+ private void testNotAppendable(SchemaTupleFactory tf, Schema udfSchema) throws Exception {
+ SchemaTuple<?> st = tf.newTuple();
+ Schema.equals(udfSchema, st.getSchema(), false, true);
+
+ assertEquals(udfSchema.size(), st.size());
+
+ shouldAllBeNull(tf);
+
+ copyThenCompare(tf);
+
+ testSerDe(tf);
+ testInterStorageSerDe(tf);
+ }
+
+ private void copyThenCompare(SchemaTupleFactory tf) throws ExecException {
+ SchemaTuple<?> st = tf.newTuple();
+ SchemaTuple<?> st2 = tf.newTuple();
+ fillWithData(st);
+ st2.set(st);
+ assertTrue(st.equals(st2));
+ assertEquals(st.compareTo(st2), 0);
+ st.set(0, null);
+ assertFalse(st.equals(st2));
+ assertEquals(st.compareTo(st2), -1);
+ assertTrue(st.isNull(0));
+ st2.set(0, null);
+ assertTrue(st.equals(st2));
+ assertEquals(st.compareTo(st2), 0);
+ }
+
+ /**
+ * This ensures that a fresh Tuple out of a TupleFactory
+ * will be full of null fields.
+ * @param tf a TupleFactory
+ * @throws ExecException
+ */
+ private void shouldAllBeNull(SchemaTupleFactory tf) throws ExecException {
+ Tuple t = tf.newTuple();
+ for (Object o : t) {
+ assertNull(o);
+ }
+ for (int i = 0; i < t.size(); i++) {
+ assertNull(t.get(i));
+ assertTrue(t.isNull(i));
+ }
+ }
+
+ private void fillWithData(SchemaTuple<?> st) throws ExecException {
+ Schema udfSchema = st.getSchema();
+ int pos = 0;
+ for (FieldSchema fs : udfSchema.getFields()) {
+ Object val;
+ if (fs.type == DataType.TUPLE) {
+ val = SchemaTupleFactory
+ .getInstance(fs.schema, false, GenContext.FORCE_LOAD)
+ .newTuple();
+ fillWithData((SchemaTuple<?>)val);
+ } else {
+ val = randData(fs);
+ }
+ st.set(pos++, val);
+ }
+ }
+
+ private Random r = new Random(100L);
+
+ private Object randData(FieldSchema fs) throws ExecException {
+ switch (fs.type) {
+ case DataType.BOOLEAN: return r.nextBoolean();
+ case DataType.BYTEARRAY: return new DataByteArray(new BigInteger(130, r).toByteArray());
+ case DataType.CHARARRAY: return new BigInteger(130, r).toString(32);
+ case DataType.INTEGER: return r.nextInt();
+ case DataType.LONG: return r.nextLong();
+ case DataType.FLOAT: return r.nextFloat();
+ case DataType.DOUBLE: return r.nextDouble();
+ case DataType.BAG:
+ DataBag db = BagFactory.getInstance().newDefaultBag();
+ int sz = r.nextInt(100);
+ for (int i = 0; i < sz; i++) {
+ int tSz = r.nextInt(10);
+ Tuple t = TupleFactory.getInstance().newTuple(tSz);
+ for (int j = 0; j < tSz; j++) {
+ t.set(j, r.nextInt());
+ }
+ db.add(t);
+ }
+ return db;
+ default: throw new RuntimeException("Cannot generate data for given FieldSchema: " + fs);
+ }
+ }
+
+ public void testTypeAwareGetSetting(TupleFactory tf) throws ExecException {
+ SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+ checkNullGetThrowsError(st);
+ }
+
+ private void checkNullGetThrowsError(SchemaTuple<?> st) throws ExecException {
+ Schema schema = st.getSchema();
+ int i = 0;
+ for (Schema.FieldSchema fs : schema.getFields()) {
+ boolean fieldIsNull = false;
+ try {
+ switch (fs.type) {
+ case DataType.BOOLEAN: st.getBoolean(i); break;
+ case DataType.BYTEARRAY: st.getBytes(i); break;
+ case DataType.CHARARRAY: st.getString(i); break;
+ case DataType.INTEGER: st.getInt(i); break;
+ case DataType.LONG: st.getLong(i); break;
+ case DataType.FLOAT: st.getFloat(i); break;
+ case DataType.DOUBLE: st.getDouble(i); break;
+ case DataType.TUPLE: st.getTuple(i); break;
+ case DataType.BAG: st.getDataBag(i); break;
+ default: throw new RuntimeException("Unsupported FieldSchema in SchemaTuple: " + fs);
+ }
+ } catch (FieldIsNullException e) {
+ fieldIsNull = true;
+ }
+ assertTrue(fieldIsNull);
+ i++;
+ }
+ }
+
+ /**
+ * This tests that InterStorage will work as expected with the Tuples generated by
+ * the given TupleFactory.
+ * @param tf
+ * @throws Exception
+ */
+ public void testInterStorageSerDe(SchemaTupleFactory tf) throws Exception {
+ int sz = 4096;
+ List<Tuple> written = new ArrayList<Tuple>(sz);
+
+ File temp = File.createTempFile("tmp", "tmp");
+ temp.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(temp);
+ DataOutputStream dos = new DataOutputStream(fos);
+
+ InterRecordWriter writer = new InterRecordWriter(dos);
+
+ // We add these lines because a part of the InterStorage logic
+ // is the ability to seek to the next Tuple based on a magic set
+ // of bytes. This emulates the random byes that will be present
+ // at the beginning of a split.
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+
+ for (int i = 0; i < sz; i++) {
+ SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+ fillWithData(st);
+ writer.write(null, st);
+ written.add(st);
+
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ dos.writeByte(r.nextInt());
+ }
+ writer.close(null);
+
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+
+ TaskAttemptID taskId = HadoopShims.createTaskAttemptID("jt", 1, true, 1, 1);
+ conf.set("mapred.task.id", taskId.toString());
+
+ InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, temp.length(), null);
+
+ InterRecordReader reader = new InterRecordReader();
+ reader.initialize(is, new TaskAttemptContext(conf, taskId));
+
+ for (int i = 0; i < sz; i++) {
+ assertTrue(reader.nextKeyValue());
+ SchemaTuple<?> st = (SchemaTuple<?>)reader.getCurrentValue();
+ assertEquals(written.get(i), st);
+ }
+ reader.close();
+
+ }
+
+ public void testSerDe(SchemaTupleFactory tf) throws IOException {
+ int sz = 4096;
+ List<Tuple> written = new ArrayList<Tuple>(sz);
+
+ File temp = File.createTempFile("tmp", "tmp");
+ temp.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(temp);
+ DataOutput dos = new DataOutputStream(fos);
+
+ for (int i = 0; i < sz; i++) {
+ SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
+ fillWithData(st);
+ bis.writeDatum(dos, st);
+ written.add(st);
+ }
+ fos.close();
+
+ assertEquals(sz, written.size());
+
+ FileInputStream fis = new FileInputStream(temp);
+ DataInput din = new DataInputStream(fis);
+ for (int i = 0; i < sz; i++) {
+ SchemaTuple<?> st = (SchemaTuple<?>)bis.readDatum(din);
+ assertEquals(written.get(i), st);
+ }
+ fis.close();
+ }
+
+ @Test
+ public void testFRJoinWithSchemaTuple() throws Exception {
+ testJoinType("replicated", false);
+ }
+
+ @Test
+ public void testMergeJoinWithSchemaTuple() throws Exception {
+ testJoinType("merge", true);
+ }
+
+ public void testJoinType(String joinType, boolean preSort) throws Exception {
+ Properties props = PropertiesUtil.loadDefaultProperties();
+ props.setProperty("pig.schematuple", "true");
+ PigServer pigServer = new PigServer(ExecType.LOCAL, props);
+
+ Data data = resetData(pigServer);
+
+ data.set("foo1",
+ tuple(0),
+ tuple(1),
+ tuple(2),
+ tuple(3),
+ tuple(4),
+ tuple(5),
+ tuple(6),
+ tuple(7),
+ tuple(8),
+ tuple(9)
+ );
+
+ data.set("foo2",
+ tuple(0),
+ tuple(1),
+ tuple(2),
+ tuple(3),
+ tuple(4),
+ tuple(5),
+ tuple(6),
+ tuple(7),
+ tuple(8),
+ tuple(9)
+ );
+
+ pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);");
+ pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);");
+ if (preSort) {
+ pigServer.registerQuery("A = ORDER A BY x ASC;");
+ pigServer.registerQuery("B = ORDER B BY x ASC;");
+ }
+ pigServer.registerQuery("C = JOIN A by x, B by x using '"+joinType+"';");
+ pigServer.registerQuery("D = ORDER C BY $0 ASC;");
+
+ Iterator<Tuple> out = pigServer.openIterator("D");
+ for (int i = 0; i < 10; i++) {
+ if (!out.hasNext()) {
+ throw new Exception("Output should have had more elements! Failed on element: " + i);
+ }
+ assertEquals(tuple(i, i), out.next());
+ }
+ assertFalse(out.hasNext());
+
+ pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();");
+
+ List<Tuple> tuples = data.get("bar");
+
+ if (tuples.size() != 10) {
+ throw new Exception("Output does not have enough elements! List: " + tuples);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(tuple(i, i), tuples.get(i));
+ }
+
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java (added)
+++ pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java Tue Jul 3 20:36:09 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Method;
+
+import org.apache.pig.data.utils.MethodHelper.NotImplemented;
+import org.junit.Test;
+
+public class TestMethodHelper {
+ @Test
+ public void testImplementation() throws NoSuchMethodException {
+ Method t1 = ITest.class.getMethod("t1");
+ Method t2 = ITest.class.getMethod("t2");
+
+ shouldBe(t1, Test1.class, true);
+ shouldBe(t2, Test1.class, true);
+
+ shouldBe(t1, Test2.class, false);
+ shouldBe(t2, Test2.class, true);
+
+ shouldBe(t1, Test3.class, true);
+ shouldBe(t2, Test3.class, true);
+
+ shouldBe(t1, Test4.class, false);
+ shouldBe(t2, Test4.class, true);
+
+ shouldBe(t1, Test5.class, false);
+ shouldBe(t2, Test5.class, false);
+
+ shouldBe(t1, Test6.class, true);
+ shouldBe(t2, Test6.class, false);
+
+ shouldBe(t1, Test7.class, false);
+ shouldBe(t2, Test7.class, false);
+ }
+
+ private void shouldBe(Method m, Class c, boolean b) {
+ assertTrue(MethodHelper.isNotImplementedAnnotationPresent(m, c) == b);
+ }
+
+ static interface ITest {
+ public void t1();
+
+ public void t2();
+ }
+
+ static class Test1 implements ITest {
+ @NotImplemented
+ public void t1() {}
+
+ @NotImplemented
+ public void t2() {}
+ }
+
+ static class Test2 implements ITest {
+ public void t1() {}
+
+ @NotImplemented
+ public void t2() {}
+ }
+
+ static class Test3 extends Test1 {}
+
+ static class Test4 extends Test2 {}
+
+ static class Test5 extends Test3 {
+ public void t1() {}
+
+ public void t2() {}
+ }
+
+ static class Test6 extends Test5 {
+ @NotImplemented
+ public void t1() {}
+ }
+
+ static class Test7 extends Test6 {
+ public void t1() {}
+ }
+
+}
Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Tue Jul 3 20:36:09 2012
@@ -185,8 +185,7 @@ public class TestDataBag extends junit.f
// Write tuples into both
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 10; i++) {
- Tuple t = TupleFactory.getInstance().newTupleForSchema(DataType.INTEGER);
- t.set(0, i);
+ Tuple t = TupleFactory.getInstance().newTuple(new Integer(i));
b.add(t);
rightAnswer.add(t);
}
Modified: pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Jul 3 20:36:09 2012
@@ -253,7 +253,7 @@ public class TestLogicalPlanBuilder {
Operator lo = listOp.get(0);
if (lo instanceof LOCogroup) {
- Assert.assertEquals( 1, ((LOCogroup) lo).getRequestedParallelisam() );//Local mode, paraallel = 1
+ Assert.assertEquals( 1, ((LOCogroup) lo).getRequestedParallelism() );//Local mode, paraallel = 1
} else {
Assert.fail("Error: Unexpected Parse Tree output");
}
Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Tue Jul 3 20:36:09 2012
@@ -16,24 +16,6 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.pig.test;
import java.io.IOException;