You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC
svn commit: r614325 [2/6] - in /incubator/pig/branches/types: ./ lib/
scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/
src/org/apac...
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Tue Jan 22 13:17:12 2008
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,319 +17,102 @@
*/
package org.apache.pig.data;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.eval.collector.DataCollector;
-
-
-/**
- * A collection of Data values of a given type. For performance reasons
- * types are not checked on add or read.
- */
-public class DataBag extends ComplexDatum {
-
-/**
- * Create an empty bag of the indicated type.
- * @param elementType what this will be a bag of.
- */
-public DataBag(Datum.DataType elementType)
-{
- mContent = new ArrayList<Datum>();
- mElementType = elementType;
-}
-
-/**
- * Create a bag based on a list. The element type will be taken from the
- * first Datum in the list.
- * @param c list of data to place in the bag. This list cannot be empty.
- * @throws IOException if the list is empty.
- */
-public DataBag(List<Datum> c) throws IOException
-{
- mContent = c;
- if (c.size() == 0) {
- throw new IOException("Attempt to instantiate empty bag with no type.");
- }
- mElementType = c.get(0).getType();
-}
-
-/**
- * Create a bag given a single datum. The element type will be taken from
- * the datum.
- * @param t The datum to use. The datum must not be java null (pig null
- * is ok).
- * @throws IOException if the datum is null.
- */
-public DataBag(Datum t) throws IOException
-{
- if (t == null) {
- throw new IOException("Attempt to instantiate empty bag with no type.");
- }
- mContent = new ArrayList<Datum>();
- mContent.add(t);
- mElementType = t.getType();
-}
-
-/**
- * @return BAG
- */
-public DataType getType() { return Datum.DataType.BAG; }
-
-/**
- * Find out what this is a bag of.
- * @return datatype
- */
-public DataType bagOf() { return mElementType; }
-
-public long size() { return mContent.size(); }
-
-/**
- * @deprecated Use size() instead.
- */
-public int cardinality() { return (int)size(); }
-
-/**
- * Checks if the size of the bag is empty.
- */
-public boolean isEmpty() { return mContent.size() == 0; }
-
-public int compareTo(Object other)
-{
- if (!(other instanceof Datum)) return -1;
-
- Datum od = (Datum)other;
-
- if (od.getType() != Datum.DataType.BAG) return crossTypeCompare(od);
-
- DataBag bag = (DataBag)od;
-
- Datum.DataType dt = bagOf();
- Datum.DataType dto = bag.bagOf();
- if (dt != dto) return dt.compareTo(dto);
-
- long sz = size();
- long tsz = bag.size();
- if (sz < tsz) return -1;
- else if (sz > tsz) return 1;
-
- Iterator<Datum> i = content();
- Iterator<Datum> j = bag.content();
- while (i.hasNext()) {
- Datum us = i.next();
- Datum them = j.next();
- int rc = us.compareTo(them);
- if (rc != 0) return rc;
- }
-
- return 0;
-}
-
-// Don't make this use compareTo. These functions are used in things like hashs
-// and we want them to be as fast as possible.
-@Override
-public boolean equals(Object other)
-{
- if (!(other instanceof DataBag)) return false;
-
- DataBag bag = (DataBag)other;
-
- long sz = size();
-
- if (bagOf() != bag.bagOf()) return false;
- if (bag.size() != sz) return false;
-
- Iterator<Datum> i = content();
- Iterator<Datum> j = bag.content();
- while (i.hasNext()) {
- Datum us = i.next();
- Datum them = j.next();
- if (!us.equals(them)) return false;
- }
-
- return true;
-}
-
-public void sort()
-{
- Collections.sort(mContent);
- mIsSorted = true;
-}
-
-public void sort(EvalSpec spec)
-{
- Collections.sort(mContent, spec.getComparator());
- mIsSorted = true;
-}
-
-public void arrange(EvalSpec spec)
-{
- sort(spec);
- mIsSorted = true;
-}
-
-public void distinct()
-{
- // ARG!!!! We're sorting the whole thing and then doing a distinct. Need to
- // change this to do distinct during sort.
- Collections.sort(mContent);
- mIsSorted = true;
-
- Tuple lastTup = null;
- for (Iterator<Datum> it = mContent.iterator(); it.hasNext(); ) {
- Tuple thisTup = (Tuple)it.next();
-
- if (lastTup == null) {
- lastTup = thisTup;
- continue;
- }
-
- if (thisTup.compareTo(lastTup) == 0) {
- it.remove();
- } else {
- lastTup = thisTup;
- }
- }
-}
-
-/**
- * Get an iterator to the contents of the bag. The iterator is an
- * iterator of Datum. If something else is expected the caller will have to
- * cast it.
- */
-public Iterator<Datum> content() { return mContent.iterator(); }
-
-/**
- * Add a datum to the bag. The datatype of the datum should match the
- * result of bagOf(), but that will not be checked in the interest of
- * speed. Would like this method to be final, but BigDataBag overrides it.
- */
-public void add(Datum e)
-{
- if (e != null) mContent.add(e);
-}
-
-/**
- * Add the contents of a bag to the bag. The datatype of the data should match the
- * result of bagOf(), but that will not be checked in the interest of
- * speed.
- */
-public final void addAll(DataBag b)
-{
- Iterator<Datum> it = b.content();
- while (it.hasNext()) {
- add(it.next());
- }
-}
-
-/**
- * Remove a particular datum from the bag. This operation will be slow
- * and should not be used much.
- */
-public void remove(Datum d) { mContent.remove(d); }
-
-/**
- * Returns the value of field i. Since there may be more than one tuple in the bag, this
- * function throws an exception if it is not the case that all tuples agree on this field
- */
-/*
-public DataAtom getField(int i) throws IOException
-{
- DataAtom val = null;
-
- for (Iterator<Tuple> it = mContent(); it.hasNext();) {
- DataAtom currentVal = it.next().getAtomField(i);
-
- if (val == null) {
- val = currentVal;
- } else {
- if (!val.strval().equals(currentVal.strval()))
- throw new IOException("Cannot call getField on a databag unless all tuples agree.");
- }
- }
+import java.util.ArrayList;
- if (val == null)
- throw new IOException("Cannot call getField on an empty databag.");
+import org.apache.hadoop.io.WritableComparable;
- return val;
-}
-*/
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
/**
- * Empty the bag of its contents. It retains they type of bag it is.
+ * A collection of Tuples. A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager. By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out. This may happen multiple times. The bag
+ * tracks all of the files it's spilled to.
+ *
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents. The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read. That is, a DataBag cannot be used as a queue. If data is written
+ * after data is read, the results are undefined. This condition is not
+ * checked on each add or read, for reasons of speed. Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized. This means that reading from a DataBag is currently
+ * serialized. This is ok for the moment because pig execution is
+ * currently single threaded. A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct. The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
*/
-public void clear()
-{
- mContent.clear();
- mIsSorted = false;
-}
-
-@Override
-public void write(DataOutput out) throws IOException
-{
- out.write(Datum.DataType.BAG.getMarker());
- // Now write out the element type, so the reader knows what kind of bag to
- // instantiate.
- out.write(mElementType.getMarker());
- out.writeLong(size());
- Iterator<Datum> it = content();
- while (it.hasNext()) {
- Datum item = it.next();
- item.write(out);
- }
-}
-
-public static abstract class BagDelimiterTuple extends Tuple{}
-public static class StartBag extends BagDelimiterTuple{}
-
-public static class EndBag extends BagDelimiterTuple{}
-
-public static final Tuple startBag = new StartBag();
-public static final Tuple endBag = new EndBag();
-
-static DataBag read(DataInput in) throws IOException
-{
- DataType etype = Datum.DataType.markerToType(in.readByte());
-
- long size = in.readLong();
- DataBag ret = new DataBag(etype);
- // TODO
- //DataBag ret = BagFactory.getInstance().getNewBag();
-
- for (int i = 0; i < size; i++) {
- ret.add(DatumImpl.readDatum(in));
- }
- return ret;
-}
-
-public void markStale(boolean stale){}
-
-@Override
-public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append('{');
- Iterator<Datum> it = content();
- while ( it.hasNext() ) {
- Datum t = it.next();
- String s = t.toString();
- sb.append(s);
- if (it.hasNext())
- sb.append(", ");
- }
- sb.append('}');
- return sb.toString();
-}
-
-public boolean mIsSorted(){ return mIsSorted; }
-
-protected boolean mIsSorted = false;
-protected List<Datum> mContent;
-protected Datum.DataType mElementType;
-
+public interface DataBag extends Spillable, WritableComparable, Iterable<Tuple> {
+ /**
+ * Get the number of elements in the bag, both in memory and on disk.
+ */
+ long size();
+
+ /**
+ * Find out if the bag is sorted.
+ */
+ boolean isSorted();
+
+ /**
+ * Find out if the bag is distinct.
+ */
+ boolean isDistinct();
+
+ /**
+ * Get an iterator to the bag. For default and distinct bags,
+ * no particular order is guaranteed. For sorted bags the order
+ * is guaranteed to be sorted according
+ * to the provided comparator.
+ */
+ Iterator<Tuple> iterator();
+
+ /**
+ * Add a tuple to the bag.
+ * @param t tuple to add.
+ */
+ void add(Tuple t);
+
+ /**
+ * Add contents of a bag to the bag.
+ * @param b bag to add contents of.
+ */
+ void addAll(DataBag b);
+
+ /**
+ * Clear out the contents of the bag, both on disk and in memory.
+ * Any attempts to read after this is called will produce undefined
+ * results.
+ */
+ void clear();
+
+ /**
+ * This is used by FuncEvalSpec.FakeDataBag.
+ * @param stale Set stale state.
+ */
+ void markStale(boolean stale);
}
Added: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.lang.reflect.Type;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A class of static final values used to encode data type. This could be
+ * done as an enumeration, but it done as byte codes instead to save
+ * creating objects. A few utility functions are also included.
+ */
+public class DataType {
+ // IMPORTANT! This list can be used to record values of data on disk,
+ // so do not change the values. You may strand user data.
+ // IMPORTANT! Order matters here, as compare() below uses the order to
+ // order unlink datatypes. Don't change this ordering.
+ // Spaced unevenly to leave room for new entries without changing
+ // values or creating order issues.
+ public static final byte UNKNOWN = 0;
+ public static final byte NULL = 1;
+ public static final byte BOOLEAN = 5;
+ public static final byte INTEGER = 10;
+ public static final byte LONG = 15;
+ public static final byte FLOAT = 20;
+ public static final byte DOUBLE = 25;
+ public static final byte BYTEARRAY = 50;
+ public static final byte CHARARRAY = 55;
+ public static final byte MAP = 100;
+ public static final byte TUPLE = 110;
+ public static final byte BAG = 120;
+ public static final byte ERROR = -1;
+
+ /**
+ * Determine the datatype of an object.
+ * @param o Object to test.
+ * @return byte code of the type, or ERROR if we don't know.
+ */
+ public static byte findType(Object o) {
+ if (o == null) return NULL;
+
+ // Try to put the most common first
+ if (o instanceof DataByteArray) return BYTEARRAY;
+ else if (o instanceof String) return CHARARRAY;
+ else if (o instanceof Tuple) return TUPLE;
+ else if (o instanceof DataBag) return BAG;
+ else if (o instanceof Integer) return INTEGER;
+ else if (o instanceof Long) return LONG;
+ else if (o instanceof Map) return MAP;
+ else if (o instanceof Float) return FLOAT;
+ else if (o instanceof Double) return DOUBLE;
+ else if (o instanceof Boolean) return BOOLEAN;
+ else return ERROR;
+ }
+
+ /**
+ * Given a Type object determine the data type it represents. This isn't
+ * cheap, as it uses reflection, so use sparingly.
+ * @param t Type to examine
+ * @return byte code of the type, or ERROR if we don't know.
+ */
+ public static byte findType(Type t) {
+ if (t == null) return NULL;
+
+ // Try to put the most common first
+ if (t == DataByteArray.class) return BYTEARRAY;
+ else if (t == String.class) return CHARARRAY;
+ else if (t == Integer.class) return INTEGER;
+ else if (t == Long.class) return LONG;
+ else if (t == Float.class) return FLOAT;
+ else if (t == Double.class) return DOUBLE;
+ else if (t == Boolean.class) return BOOLEAN;
+ else {
+ // Might be a tuple or a bag, need to check the interfaces it
+ // implements
+ if (t instanceof Class) {
+ Class c = (Class)t;
+ Class[] interfaces = c.getInterfaces();
+ for (int i = 0; i < interfaces.length; i++) {
+ if (interfaces[i].getName().equals("org.apache.pig.data.Tuple")) {
+ return TUPLE;
+ } else if (interfaces[i].getName().equals("org.apache.pig.data.DataBag")) {
+ return BAG;
+ } else if (interfaces[i].getName().equals("java.util.Map")) {
+ return MAP;
+ }
+ }
+ }
+ return ERROR;
+ }
+ }
+
+ /**
+ * Get the type name.
+ * @param o Object to test.
+ * @return type name, as a String.
+ */
+ public static String findTypeName(Object o) {
+ byte dt = findType(o);
+ switch (dt) {
+ case NULL: return "NULL";
+ case BOOLEAN: return "boolean";
+ case INTEGER: return "integer";
+ case LONG: return "long";
+ case FLOAT: return "float";
+ case DOUBLE: return "double";
+ case BYTEARRAY: return "bytearray";
+ case CHARARRAY: return "chararray";
+ case MAP: return "map";
+ case TUPLE: return "tuple";
+ case BAG: return "bag";
+ default: return "Unknown";
+ }
+ }
+
+ /**
+ * Determine whether the this data type is complex.
+ * @param dataType Data type code to test.
+ * @return true if dataType is bag, tuple, or map.
+ */
+ public static boolean isComplex(byte dataType) {
+ return ((dataType == BAG) || (dataType == TUPLE) ||
+ (dataType == MAP));
+ }
+
+ /**
+ * Determine whether the object is complex or atomic.
+ * @param o Object to determine type of.
+ * @return true if dataType is bag, tuple, or map.
+ */
+ public static boolean isComplex(Object o) {
+ return isComplex(findType(o));
+ }
+
+ /**
+ * Determine whether the this data type is atomic.
+ * @param dataType Data type code to test.
+ * @return true if dataType is bytearray, chararray, integer, long,
+ * float, or boolean.
+ */
+ public static boolean isAtomic(byte dataType) {
+ return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) ||
+ (dataType == INTEGER) || (dataType == LONG) ||
+ (dataType == FLOAT) || (dataType == BOOLEAN));
+ }
+
+ /**
+ * Determine whether the this data type is atomic.
+ * @param o Object to determine type of.
+ * @return true if dataType is bytearray, chararray, integer, long,
+ * float, or boolean.
+ */
+ public static boolean isAtomic(Object o) {
+ return isAtomic(findType(o));
+ }
+
+ /**
+ * Compare two objects to each other. This function is necessary
+ * because there's no super class that implements compareTo. This
+ * function provides an (arbitrary) ordering of objects of different
+ * types as follows: NULL < BOOLEAN < INTEGER < LONG <
+ * FLOAT < DOUBLE * < BYTEARRAY < STRING < MAP <
+ * TUPLE < BAG. No other functions should implement this cross
+ * object logic. They should call this function for it instead.
+ * @param o1 First object
+ * @param o2 Second object
+ * @return -1 if o1 is less, 0 if they are equal, 1 if o2 is less.
+ */
+ public static int compare(Object o1, Object o2) {
+ byte dt1 = findType(o1);
+ byte dt2 = findType(o2);
+
+ if (dt1 == dt2) {
+ switch (dt1) {
+ case NULL:
+ return 0;
+
+ case BOOLEAN:
+ return ((Boolean)o1).compareTo((Boolean)o2);
+
+ case INTEGER:
+ return ((Integer)o1).compareTo((Integer)o2);
+
+ case LONG:
+ return ((Long)o1).compareTo((Long)o2);
+
+ case FLOAT:
+ return ((Float)o1).compareTo((Float)o2);
+
+ case DOUBLE:
+ return ((Double)o1).compareTo((Double)o2);
+
+ case BYTEARRAY:
+ return ((DataByteArray)o1).compareTo((DataByteArray)o2);
+
+ case CHARARRAY:
+ return ((String)o1).compareTo((String)o2);
+
+ case MAP: {
+ Map<Object, Object> m1 = (Map<Object, Object>)o1;
+ Map<Object, Object> m2 = (Map<Object, Object>)o2;
+ int sz1 = m1.size();
+ int sz2 = m2.size();
+ if (sz1 < sz2) {
+ return -1;
+ } else if (sz1 > sz2) {
+ return 1;
+ } else {
+ Iterator<Map.Entry<Object, Object> > i1 =
+ m1.entrySet().iterator();
+ Iterator<Map.Entry<Object, Object> > i2 =
+ m2.entrySet().iterator();
+ while (i1.hasNext()) {
+ // This isn't real meaningful, as there are no
+ // guarantees on iteration order in a map. But it
+ // makes more sense than iterating through one and
+ // probing the other, which will almost always
+ // result in missing keys in the second and thus
+ // not provide communativity.
+ Map.Entry<Object, Object> entry1 = i1.next();
+ Map.Entry<Object, Object> entry2 = i2.next();
+ int c = compare(entry1.getKey(), entry2.getKey());
+ if (c != 0) {
+ return c;
+ } else {
+ c = compare(entry1.getValue(), entry2.getValue());
+ if (c != 0) {
+ return c;
+ }
+ }
+ }
+ return 0;
+ }
+ }
+
+ case TUPLE:
+ return ((Tuple)o1).compareTo((Tuple)o2);
+
+ case BAG:
+ return ((DataBag)o1).compareTo((DataBag)o2);
+
+ default:
+ throw new RuntimeException("Unkown type " + dt1 +
+ " in compare");
+ }
+ } else if (dt1 < dt2) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
+
+/**
+ * A collection of Tuples. A DataBag may or may not fit into memory.
+ * DataBag extends spillable, which means that it registers with a memory
+ * manager. By default, it attempts to keep all of its contents in memory.
+ * If it is asked by the memory manager to spill to disk (by a call to
+ * spill()), it takes whatever it has in memory, opens a spill file, and
+ * writes the contents out. This may happen multiple times. The bag
+ * tracks all of the files it's spilled to.
+ *
+ * DataBag provides an Iterator interface, that allows callers to read
+ * through the contents. The iterators are aware of the data spilling.
+ * They have to be able to handle reading from files, as well as the fact
+ * that data they were reading from memory may have been spilled to disk
+ * underneath them.
+ *
+ * The DataBag interface assumes that all data is written before any is
+ * read. That is, a DataBag cannot be used as a queue. If data is written
+ * after data is read, the results are undefined. This condition is not
+ * checked on each add or read, for reasons of speed. Caveat emptor.
+ *
+ * Since spills are asynchronous (the memory manager requesting a spill
+ * runs in a separate thread), all operations dealing with the mContents
+ * Collection (which is the collection of tuples contained in the bag) have
+ * to be synchronized. This means that reading from a DataBag is currently
+ * serialized. This is ok for the moment because pig execution is
+ * currently single threaded. A ReadWriteLock was experimented with, but
+ * it was found to be about 10x slower than using the synchronize keyword.
+ * If pig changes its execution model to be multithreaded, we may need to
+ * return to this issue, as synchronizing reads will most likely defeat the
+ * purpose of multi-threading execution.
+ *
+ * DataBag come in several types, default, sorted, and distinct. The type
+ * must be chosen up front, there is no way to convert a bag on the fly.
+ *
+ * This is the default implementation. Users are free to provide their
+ * own implementation, but they should keep in mind the need to support
+ * bags that do not fit in memory, and handle spilling in an efficient
+ * manner.
+ */
+public abstract class DefaultAbstractBag implements DataBag {
+ // Container that holds the tuples. Actual object instantiated by
+ // subclasses.
+ protected Collection<Tuple> mContents;
+
+ // Spill files we've created. These need to be removed in finalize.
+ protected ArrayList<File> mSpillFiles;
+
+ // Total size, including tuples on disk. Stored here so we don't have
+ // to run through the disk when people ask.
+ protected long mSize = 0;
+
+ protected boolean mMemSizeChanged = false;
+
+ protected long mMemSize = 0;
+
+ /**
+ * Get the number of elements in the bag, both in memory and on disk.
+ */
+ public long size() {
+ return mSize;
+ }
+
+ /**
+ * Add a tuple to the bag.
+ * @param t tuple to add.
+ */
+ public void add(Tuple t) {
+ synchronized (mContents) {
+ mMemSizeChanged = true;
+ mSize++;
+ mContents.add(t);
+ }
+ }
+
+ /**
+ * Add contents of a bag to the bag.
+ * @param b bag to add contents of.
+ */
+ public void addAll(DataBag b) {
+ synchronized (mContents) {
+ mMemSizeChanged = true;
+ mSize += b.size();
+ Iterator<Tuple> i = b.iterator();
+ while (i.hasNext()) mContents.add(i.next());
+ }
+ }
+
+ /**
+ * Add contents of a container to the bag.
+ * @param c Collection to add contents of.
+ */
+ public void addAll(Collection<Tuple> c) {
+ synchronized (mContents) {
+ mMemSizeChanged = true;
+ mSize += c.size();
+ Iterator<Tuple> i = c.iterator();
+ while (i.hasNext()) mContents.add(i.next());
+ }
+ }
+
+ /**
+ * Return the size of memory usage.
+ */
+ @Override
+ public long getMemorySize() {
+ if (!mMemSizeChanged) return mMemSize;
+
+ long used = 0;
+ // I can't afford to talk through all the tuples every time the
+ // memory manager wants to know if it's time to dump. Just sample
+ // the first 100 and see what we get. This may not be 100%
+ // accurate, but it's just an estimate anyway.
+ int j;
+ int numInMem = 0;
+ synchronized (mContents) {
+ numInMem = mContents.size();
+ // Measure only what's in memory, not what's on disk.
+ Iterator<Tuple> i = mContents.iterator();
+ for (j = 0; i.hasNext() && j < 100; j++) {
+ used += i.next().getMemorySize();
+ }
+ }
+
+ if (numInMem > 100) {
+ // Estimate the per tuple size. Do it in integer arithmetic
+ // (even though it will be slightly less accurate) for speed.
+ used /= j;
+ used *= numInMem;
+ }
+
+ mMemSize = used;
+ mMemSizeChanged = false;
+ return used;
+ }
+
+ /**
+ * Clear out the contents of the bag, both on disk and in memory.
+ * Any attempts to read after this is called will produce undefined
+ * results.
+ */
+ public void clear() {
+ synchronized (mContents) {
+ mContents.clear();
+ if (mSpillFiles != null) {
+ for (int i = 0; i < mSpillFiles.size(); i++) {
+ mSpillFiles.get(i).delete();
+ }
+ mSpillFiles.clear();
+ }
+ mSize = 0;
+ }
+ }
+
+ /**
+ * This method is potentially very expensive since it may require a
+ * sort of the bag; don't call it unless you have to.
+ */
+ public int compareTo(Object other) {
+ if (this == other)
+ return 0;
+ if (other instanceof DataBag) {
+ DataBag bOther = (DataBag) other;
+ if (this.size() != bOther.size()) {
+ if (this.size() > bOther.size()) return 1;
+ else return -1;
+ }
+
+ // Ugh, this is bogus. But I have to know if two bags have the
+ // same tuples, regardless of order. Hopefully most of the
+ // time the size check above will prevent this.
+ // If either bag isn't already sorted, create a sorted bag out
+ // of it so I can guarantee order.
+ DataBag thisClone;
+ DataBag otherClone;
+ if (this instanceof SortedDataBag ||
+ this instanceof DistinctDataBag) {
+ thisClone = this;
+ } else {
+ thisClone = new SortedDataBag(null);
+ Iterator<Tuple> i = iterator();
+ while (i.hasNext()) thisClone.add(i.next());
+ }
+ if (other instanceof SortedDataBag ||
+ this instanceof DistinctDataBag) {
+ otherClone = bOther;
+ } else {
+ otherClone = new SortedDataBag(null);
+ Iterator<Tuple> i = bOther.iterator();
+ while (i.hasNext()) otherClone.add(i.next());
+ }
+ Iterator<Tuple> thisIt = thisClone.iterator();
+ Iterator<Tuple> otherIt = otherClone.iterator();
+ while (thisIt.hasNext() && otherIt.hasNext()) {
+ Tuple thisT = thisIt.next();
+ Tuple otherT = otherIt.next();
+
+ int c = thisT.compareTo(otherT);
+ if (c != 0) return c;
+ }
+
+ return 0; // if we got this far, they must be equal
+ } else {
+ return DataType.compare(this, other);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return compareTo(other) == 0;
+ }
+
+ /**
+ * Write a bag's contents to disk.
+ * @param out DataOutput to write data to.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // We don't care whether this bag was sorted or distinct because
+ // using the iterator to write it will guarantee those things come
+ // correctly. And on the other end there'll be no reason to waste
+ // time re-sorting or re-applying distinct.
+ out.write(DataType.BAG);
+ out.writeLong(size());
+ Iterator<Tuple> it = iterator();
+ while (it.hasNext()) {
+ Tuple item = it.next();
+ item.write(out);
+ }
+ }
+
+ /**
+ * Read a bag from disk.
+ * @param in DataInput to read data from.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ public void readFields(DataInput in) throws IOException {
+ long size = in.readLong();
+
+ for (long i = 0; i < size; i++) {
+ Object o = DataReaderWriter.readDatum(in);
+ add((Tuple)o);
+ }
+ }
+
+ /**
+ * This is used by FuncEvalSpec.FakeDataBag.
+ * @param stale Set stale state.
+ */
+ public void markStale(boolean stale)
+ {
+ }
+
+ /**
+ * Write the bag into a string. */
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append('{');
+ Iterator<Tuple> it = iterator();
+ while ( it.hasNext() ) {
+ Tuple t = it.next();
+ String s = t.toString();
+ sb.append(s);
+ if (it.hasNext()) sb.append(", ");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 1;
+ Iterator<Tuple> i = iterator();
+ while (i.hasNext()) {
+ // Use 37 because we want a prime, and tuple uses 31.
+ hash = 37 * hash + i.next().hashCode();
+ }
+ return hash;
+ }
+
+ /**
+ * Need to override finalize to clean out the mSpillFiles array.
+ */
+ @Override
+ protected void finalize() {
+ if (mSpillFiles != null) {
+ for (int i = 0; i < mSpillFiles.size(); i++) {
+ mSpillFiles.get(i).delete();
+ }
+ }
+ }
+
+ /**
+ * Get a file to spill contents to. The file will be registered in the
+ * mSpillFiles array.
+ * @return stream to write tuples to.
+ */
+ protected DataOutputStream getSpillFile() throws IOException {
+ if (mSpillFiles == null) {
+ // We want to keep the list as small as possible.
+ mSpillFiles = new ArrayList<File>(1);
+ }
+
+ File f = File.createTempFile("pigbag", null);
+ f.deleteOnExit();
+ mSpillFiles.add(f);
+ return new DataOutputStream(new BufferedOutputStream(
+ new FileOutputStream(f)));
+ }
+
+ /**
+ * Report progress to HDFS.
+ */
+ protected void reportProgress() {
+ if (PigMapReduce.reporter != null) {
+ PigMapReduce.reporter.progress();
+ }
+ }
+
+ public static abstract class BagDelimiterTuple extends DefaultTuple{}
+ public static class StartBag extends BagDelimiterTuple{}
+
+ public static class EndBag extends BagDelimiterTuple{}
+
+ public static final Tuple startBag = new StartBag();
+ public static final Tuple endBag = new EndBag();
+
+ protected static final int MAX_SPILL_FILES = 100;
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+
+/**
+ * A bag factory. Can be used to generate different types of bags
+ * depending on what is needed.
+ */
+public class DefaultBagFactory extends BagFactory {
+ /**
+ * Get a default (unordered, not distinct) data bag.
+ */
+ public DataBag newDefaultBag() {
+ DataBag b = new DefaultDataBag();
+ registerBag(b);
+ return b;
+ }
+
+ /**
+ * Get a sorted data bag.
+ * @param spec EvalSpec that controls how the data is sorted.
+ * If null, default comparator will be used.
+ */
+ public DataBag newSortedBag(EvalSpec spec) {
+ DataBag b = new SortedDataBag(spec);
+ registerBag(b);
+ return b;
+ }
+
+ /**
+ * Get a distinct data bag.
+ */
+ public DataBag newDistinctBag() {
+ DataBag b = new DistinctDataBag();
+ registerBag(b);
+ return b;
+ }
+
+ DefaultBagFactory() {
+ super();
+ }
+
+}
+
Copied: incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Tue Jan 22 13:17:12 2008
@@ -35,7 +35,9 @@
* are stored in an ArrayList, since there is no concern for order or
* distinctness.
*/
-public class DefaultDataBag extends DataBag {
+public class DefaultDataBag extends DefaultAbstractBag {
+
+ private static TupleFactory gTupleFactory = TupleFactory.getInstance();
public DefaultDataBag() {
mContents = new ArrayList<Tuple>();
@@ -162,7 +164,7 @@
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
- Tuple t = new Tuple();
+ Tuple t = gTupleFactory.newTuple();
for (int i = 0; i < mMemoryPtr; i++) {
try {
t.readFields(mIn);
@@ -195,7 +197,7 @@
private Tuple readFromFile() {
if (mIn != null) {
// We already have a file open
- Tuple t = new Tuple();
+ Tuple t = gTupleFactory.newTuple();
try {
t.readFields(mIn);
return t;
Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A bag factory. Can be used to generate different types of bags
+ * depending on what is needed.
+ */
+public class DefaultTupleFactory extends TupleFactory {
+ public Tuple newTuple() {
+ return new DefaultTuple();
+
+ }
+
+ public Tuple newTuple(int size) {
+ return new DefaultTuple(size);
+ }
+
+ public Tuple newTuple(List c) {
+ return new DefaultTuple(c);
+ }
+
+ public Tuple newTuple(Object datum) {
+ Tuple t = new DefaultTuple(1);
+ try {
+ t.set(0, datum);
+ } catch (IOException e) {
+ // The world has come to an end, we just allocated a tuple with one slot
+ // but we can't write to that slot.
+ throw new RuntimeException("Unable to write to field 0 in newly " +
+ "allocated tuple of size 1!", e);
+ }
+ return t;
+ }
+
+ DefaultTupleFactory() {
+ }
+
+}
+
Copied: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Tue Jan 22 13:17:12 2008
@@ -50,7 +50,9 @@
* ArrayList and then sorted. Dispite all these machinations, this was
* found to be faster than storing it in a TreeSet.
*/
-public class DistinctDataBag extends DataBag {
+public class DistinctDataBag extends DefaultAbstractBag {
+ private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
public DistinctDataBag() {
mContents = new HashSet<Tuple>();
}
@@ -246,7 +248,7 @@
// Fast foward past the tuples we've already put in the
// queue.
- Tuple t = new Tuple();
+ Tuple t = gTupleFactory.newTuple();
for (int i = 0; i < mMemoryPtr; i++) {
try {
t.readFields(in);
@@ -357,7 +359,7 @@
DataInputStream in = mStreams.get(fileNum);
if (in != null) {
// There's still data in this file
- c.tuple = new Tuple();
+ c.tuple = gTupleFactory.newTuple();
do {
try {
c.tuple.readFields(in);
Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Tue Jan 22 13:17:12 2008
@@ -24,7 +24,7 @@
/**
* This is an internal class that keeps track of the specific input that a Tuple came from
*/
-public class IndexedTuple extends Tuple {
+public class IndexedTuple extends DefaultTuple {
public int index = -1;
@@ -32,7 +32,9 @@
}
public IndexedTuple(Tuple t, int indexIn) {
- super(t);
+ // Have to do it like this because Tuple is an interface, we don't
+ // have access to its internal structures.
+ super(t.getAll());
index = indexIn;
}
@@ -45,19 +47,15 @@
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeInt(index);
- //encodeInt(out, index);
+ out.writeInt(index);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
index = in.readInt();
- //index = decodeInt(in);
}
public Tuple toTuple(){
- Tuple t = new Tuple();
- t.mFields = mFields;
- return t;
+ return TupleFactory.getInstance().newTuple(mFields);
}
}
Copied: incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java (from r610055, incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java)
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java?p2=incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java&p1=incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java&r1=610055&r2=614325&rev=614325&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java Tue Jan 22 13:17:12 2008
@@ -48,7 +48,9 @@
* We allow a user defined comparator, but provide a default comparator in
* cases where the user doesn't specify one.
*/
-public class SortedDataBag extends DataBag {
+public class SortedDataBag extends DefaultAbstractBag {
+ private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+
private Comparator<Tuple> mComp;
private boolean mReadStarted = false;
@@ -244,7 +246,7 @@
// Fast foward past the tuples we've already put in the
// queue.
- Tuple t = new Tuple();
+ Tuple t = gTupleFactory.newTuple();
for (int i = 0; i < mMemoryPtr; i++) {
try {
t.readFields(in);
@@ -351,7 +353,7 @@
DataInputStream in = mStreams.get(fileNum);
if (in != null) {
// There's still data in this file
- c.tuple = new Tuple();
+ c.tuple = gTupleFactory.newTuple();
try {
c.tuple.readFields(in);
mMergeQ.add(c);
Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-public class TimestampedTuple extends Tuple {
+public class TimestampedTuple extends DefaultTuple {
protected double timestamp = 0; // timestamp of this tuple
protected boolean heartbeat = false; // true iff this is a heartbeat (i.e. purpose is just to convey new timestamp; carries no data)
@@ -42,13 +42,14 @@
super(numFields);
}
+ /*
public TimestampedTuple(String textLine, String delimiter, int timestampColumn,
SimpleDateFormat dateFormat){
if (delimiter == null) {
delimiter = defaultDelimiter;
}
String[] splitString = textLine.split(delimiter, -1);
- mFields = new ArrayList<Datum>(splitString.length-1);
+ fields = new ArrayList<Datum>(splitString.length-1);
for (int i = 0; i < splitString.length; i++) {
if (i==timestampColumn){
try{
@@ -57,10 +58,11 @@
System.err.println("Could not parse timestamp " + splitString[i]);
}
}else{
- mFields.add(new DataAtom(splitString[i]));
+ fields.add(new DataAtom(splitString[i]));
}
}
}
+ */
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java Tue Jan 22 13:17:12 2008
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,568 +17,107 @@
*/
package org.apache.pig.data;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import org.apache.log4j.Logger;
-
import org.apache.hadoop.io.WritableComparable;
-import org.apache.pig.impl.util.PigLogger;
-
-/**
- * an ordered list of Data
- */
-public class Tuple extends ComplexDatum implements WritableComparable
-{
-
-static String defaultDelimiter = "[,\t]";
-static String NULL = "__PIG_NULL__";
-
-public static final byte RECORD_1 = 0x21;
-public static final byte RECORD_2 = 0x31;
-public static final byte RECORD_3 = 0x41;
-
-/**
- * Construct a tuple with no fields.
- */
-public Tuple() { this(0); }
-
-/**
- * Construct a tuple with a known number of fields. The fields will be
- * pre-populated with nulls.
- * @param numFields Number of fields in the tuple.
- */
-public Tuple(long numFields)
-{
- this(numFields, true);
-}
-
-/**
- * Construct a tuple with a knwn number of fields.
- * @param numFields Number of fields in the tuple.
- * @param prepopulate If true, prepopulate with nulls, otherwise leave the
- * tuple empty. Should be called with false only by Tuple.read()
- */
-private Tuple(long numFields, boolean prepopulate)
-{
- mFields = new ArrayList<Datum>((int)numFields);
- if (prepopulate) {
- for (int i = 0; i < numFields; i++) {
- mFields.add(null);
- }
- }
-}
-
-/**
- * Construct a tuple from an existing List.
- */
-public Tuple(List<Datum> mFieldsIn)
-{
- mFields = new ArrayList<Datum>(mFieldsIn.size());
- mFields.addAll(mFieldsIn);
-}
-
-/**
- * Construct a tuple from an existing tuple. The fields are not copied,
- * only referenced.
- */
-public Tuple(Tuple t)
-{
- mFields = new ArrayList<Datum>(t.mFields.size());
- mFields.addAll(t.mFields);
-}
-
-/**
- * shortcut, if tuple only has one field
- */
-public Tuple(Datum fieldIn)
-{
- mFields = new ArrayList<Datum>(1);
- mFields.add(fieldIn);
-}
-
-/**
- * Creates a tuple from a delimited line of text. For now creates elements as
- * DataAtoms. This should change once we have expressions that can handle the
- * new types.
- *
- * @param textLine
- * the line containing mFields of data
- * @param delimiter
- * a regular expression of the form specified by String.split(). If null, the default
- * delimiter "[,\t]" will be used.
- */
-public Tuple(String textLine, String delimiter)
-{
- if (delimiter == null) {
- delimiter = defaultDelimiter;
- }
- String[] splitString = textLine.split(delimiter, -1);
- mFields = new ArrayList<Datum>(splitString.length);
- for (int i = 0; i < splitString.length; i++) {
- mFields.add(new DataAtom(splitString[i]));
- }
-}
-
-/**
- * Creates a tuple from a delimited line of text. This will invoke Tuple(textLine, null)
- *
- * @param textLine
- * the line containing mFields of data
- */
-public Tuple(String textLine) { this(textLine, defaultDelimiter); }
-
-public Tuple(Tuple[] otherTs)
-{
- mFields = new ArrayList<Datum>(otherTs.length);
- for (int i = 0; i < otherTs.length; i++) {
- appendTuple(otherTs[i]);
- }
-}
-
-public DataType getType() { return Datum.DataType.TUPLE; }
-
-public long size() { return mFields.size(); }
-
-public void copyFrom(Tuple otherT)
-{
- this.mFields = otherT.mFields;
-}
-
-/**
- * @deprecated Using size instead.
- */
-public int arity() { return (int)size(); }
-
-@Override
-public String toString()
-{
- StringBuffer sb = new StringBuffer();
- sb.append('(');
- for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
- Datum d = it.next();
- if(d != null) {
- sb.append(d.toString());
- } else {
- sb.append(NULL);
- }
- if (it.hasNext())
- sb.append(", ");
- }
- sb.append(')');
- String s = sb.toString();
- return s;
-}
-
-public final void setField(long i, Datum val) throws IOException
-{
- if (i >= mFields.size()) {
- throw new IOException("Column number out of range, tried to access " + i + " in a tuple of only " + mFields.size() + "columns");
- }
- mFields.set((int)i, val);
-}
-
/**
- * Set a field with an int value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataInteger.
- * @param val integer value to set field to.
- */
-public final void setField(int i, int val) throws IOException
-{
- setField(i, new DataAtom(val));
-}
-
-/**
- * Set a field with a double value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataDouble.
- * @param val double value to set field to.
- */
-public void setField(int i, double val) throws IOException
-{
- setField(i, new DataAtom(val));
-}
-
-/**
- * Set a field with an string value. Push it into a DataAtom for the moment,
- * eventually we'll change this to a DataCharArrayUtf16.
- * @param val string value to set field to.
- */
-public void setField(int i, String val) throws IOException
-{
- setField(i, new DataAtom(val));
-}
-
-/**
- * Get a field from the tuple.
- * @param i Field to get
- * @return Field value, as Datum.
- */
-public final Datum getField(long i)
-{
- Datum d = null;
- if ((i >= mFields.size()) || ((d = mFields.get((int)i)) == null)) {
- d = new DataUnknown();
- d.setNull(true);
- }
- return d;
-}
-
-/**
- * @deprecated Use specific types instead of DataAtom.
- * Get field i, if it is an Atom or can be coerced into an Atom.
- * @param i field to get as an atom.
- * @return contents of the field. If its of type DataAtom then that will
- * be returned. If it's of tuple of one field, then that field will be
- * returned. If it's a bag of one element, then that element will be
- * returned. If it's one of the new atomic types (int, etc.) it will push
- * that into a data atom and return that.
- * @throws IOException if the field isn't an atom and it can't figure out
- * how to do the coercion.
- */
-public DataAtom getAtomField(int i) throws IOException
-{
- Datum field = getField(i); // throws exception if field doesn't exist
-
- // This shouldn't actually ever happen anymore.
- if (field instanceof DataAtom) return (DataAtom) field;
-
- switch (field.getType()) {
- case INT: return new DataAtom(((DataInteger)field).get());
- case LONG: return new DataAtom(((DataLong)field).get());
- case FLOAT: return new DataAtom(((DataFloat)field).get());
- case DOUBLE: return new DataAtom(((DataDouble)field).get());
- case UNKNOWN: return new DataAtom(((DataUnknown)field).get());
- case CHARARRAY:
- switch (((DataCharArray)field).getEncoding()) {
- case UTF16:
- return new DataAtom(((DataCharArrayUtf16)field).get());
- case NONE:
- return new DataAtom(((DataCharArrayNone)field).get());
- default: throw new AssertionError("Unknown encoding");
- }
-
- // Can't use getFieldAsAtomic for tuple and bag because these need to
- // recurse to getAtomField instead.
- case TUPLE: {
- Tuple t = (Tuple) field;
- if (t.size() == 1) {
- PigLogger.getLogger().warn("Asked for an atom field but found a tuple with one field.");
- return t.getAtomField(0);
- }
- break;
- }
-
- case BAG: {
- DataBag b = (DataBag) field;
- if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
- Tuple t = (Tuple)b.content().next();
- if (t.size() == 1) {
- PigLogger.getLogger().warn("Asked for an atom field but found a bag with a tuple with one field.");
- return t.getAtomField(0);
- }
- }
- break;
- }
-
- default: break;
- }
- throw new IOException("Incompatible type for request getAtomField().");
-}
-
-/**
- * Get field i, if it is an Atomic type or can be coerced into an Atomic
- * type.
- * @param i field to get as an atomic type.
- * @return contents of the field. If its an atomic type it will be
- * returned as is. If it is unknown, it will be converted to a char array
- * none and returned as that. If it's of tuple of one field, then
- * that field will be returned. If it's a bag of one element, then
- * that element will be returned.
- * @throws IOException if the field isn't an atom and it can't figure out
- * how to do the coercion.
- */
-public AtomicDatum getFieldAsAtomic(int i) throws IOException
-{
- Datum field = getField(i); // throws exception if field doesn't exist
-
- if (field.getDimension() == Datum.DataDimension.ATOMIC) {
- return (AtomicDatum)field;
- }
-
- switch (field.getType()) {
- case UNKNOWN:
- return new DataCharArrayNone(((DataUnknown)field).get());
-
- case TUPLE: {
- Tuple t = (Tuple) field;
- if (t.size() == 1) {
- PigLogger.getLogger().warn("Warning: Asked for an atom field but found a tuple with one field.");
- return t.getFieldAsAtomic(0);
- }
- break;
- }
-
- case BAG: {
- DataBag b = (DataBag) field;
- if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
- Tuple t = (Tuple)b.content().next();
- if (t.size() == 1) {
- PigLogger.getLogger().warn("Warning: Asked for an atom field but found a bag with a tuple with one field.");
- return t.getFieldAsAtomic(0);
- }
- }
- break;
- }
-
- default: break;
- }
-
- throw new IOException("Incompatible type for request getAtomField().");
-}
-
-/**
- * Attempt to fetch a field as a tuple.
- * @param i field number to get.
- * @return If the field is a tuple, return it. If it's bag of one tuple,
- * return it. Otherwise...
- * @throws IOException if the field is neither a tuple nor a bag of one
- * tuple.
- */
-public Tuple getTupleField(int i) throws IOException
-{
- Datum field = getField(i); // throws exception if field doesn't exist
-
- if (field.getType() == Datum.DataType.TUPLE) {
- return (Tuple) field;
- } else if (field.getType() == Datum.DataType.BAG) {
- DataBag b = (DataBag) field;
- if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
- return (Tuple)b.content().next();
- }
- }
-
- throw new IOException("Incompatible type for request getTupleField().");
-}
-
-/**
- * Attempt to fetch a field as a bag.
- * @param i field number to get.
- * @return If the field is a bag, return it. Otherwise...
- * @throws IOException if the field is not a bag.
- */
-public DataBag getBagField(int i) throws IOException
-{
- Datum field = getField(i); // throws exception if field doesn't exist
-
- if (field.getType() == Datum.DataType.BAG) return (DataBag) field;
-
- throw new IOException("Incompatible type for request getBagField().");
-}
-
-public final void appendTuple(Tuple other)
-{
- for (Iterator<Datum> it = other.mFields.iterator(); it.hasNext();) {
- mFields.add(it.next());
- }
-}
-
-public final void appendField(Datum newField) { mFields.add(newField); }
-
-public String toDelimitedString(String delim) throws IOException
-{
- StringBuffer buf = new StringBuffer();
- for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
- Datum field = it.next();
- if (field.getDimension() == Datum.DataDimension.COMPLEX) {
- throw new IOException("Unable to convert non-flat tuple to string.");
- }
-
- buf.append(field.toString());
- if (it.hasNext()) buf.append(delim);
- }
- return buf.toString();
-}
-
-/*
- public boolean lessThan(Tuple other) {
- return (this.compareTo(other) < 0);
- }
-
- public boolean greaterThan(Tuple other) {
- return (this.compareTo(other) > 0);
- }
-
- */
-
-/**
- * See if two tuples are equal to each other. Tuple equality is defined as being
- * of the same size, and for each field f1...fn in t1 and fields g1...gn in t2,
- * f1.equals(g1) ... fn.equals(gn) holds true.
- * Don't make this use compareTo. These functions are used in things like hashs
- * and we want them to be as fast as possible.
- */
-@Override
-public boolean equals(Object other)
-{
- if (!(other instanceof Tuple)) return false;
-
- Tuple t = (Tuple)other;
-
- long sz = size();
-
- if (t.size() != sz) return false;
-
- for (long i = 0; i < sz; i++) {
- if (!t.getField(i).equals(getField(i))) return false;
- }
-
- return true;
-}
-
-public int compareTo(Object other)
-{
- if (!(other instanceof Datum)) return -1;
-
- Datum od = (Datum)other;
-
- if (od.getType() != Datum.DataType.TUPLE) return crossTypeCompare(od);
-
- Tuple t = (Tuple)od;
-
- long sz = size();
- long tsz = t.size();
- if (sz < tsz) return -1;
- else if (sz > tsz) return 1;
-
- for (long i = 0; i < sz; i++) {
- int c = mFields.get((int)i).compareTo(t.mFields.get((int)i));
- if (c != 0) return c;
- }
- return 0;
-}
-
-@Override
-public int hashCode()
-{
- int hash = 1;
- for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
- Datum f = it.next();
- if (f == null) hash += 1;
- else hash = 31 * hash + f.hashCode();
- }
- return hash;
-}
-
-// WritableComparable methods:
-
-@Override
-public void write(DataOutput out) throws IOException
-{
- out.write(Datum.DataType.TUPLE.getMarker());
- long n = size();
- out.writeLong(n);
- for (long i = 0; i < n; i++) {
- Datum d = getField((int)i);
- if (d != null){
- d.write(out);
- } else {
- throw new RuntimeException("Null field in tuple");
- }
- }
-}
-
-/**
- * This method is invoked when the beginning 'TUPLE' is still on the stream.
- * @param in DataInput to read from
- * @throws IOExcetion if the expected data isn't a tuple.
+ * An ordered list of Data. A tuple has fields, numbered 0 through
+ * (number of fields - 1). The entry in the field can be any datatype,
+ * or it can be null.
+ *
+ * Tuples are constructed only by a TupleFactory. A DefaultTupleFactory
+ * is provided by the system. If a user wishes to use their own type of
+ * Tuple, they should also provide an implementation of TupleFactory to
+ * construct their types of Tuples.
+ *
+ * Fields are numbered from 0.
*/
-public void readFields(DataInput in) throws IOException
-{
- byte[] b = new byte[1];
- in.readFully(b);
- if (b[0] != Datum.DataType.TUPLE.getMarker())
- throw new IOException("Unexpected data while reading tuple from binary file");
- Tuple t = read(in);
- mFields = t.mFields;
-}
-
-//This method is invoked when the beginning 'TUPLE' has been read off the stream
-public static Tuple read(DataInput in) throws IOException
-{
- long size = in.readLong();
-
- // nuke the old contents of the tuple
- Tuple ret = new Tuple(size, false);
-
- for (int i = 0; i < size; i++) {
- ret.appendField(DatumImpl.readDatum(in));
- }
-
- return ret;
-}
-
- /*
-public static Datum readDatum(DataInput in) throws IOException
-{
- byte[] b = new byte[1];
- in.readFully(b);
- switch (b[0]) {
- case TUPLE:
- return Tuple.read(in);
- case BAG:
- return DataBag.read(in);
- case MAP:
- return DataMap.read(in);
- case INT:
- return DataInt.read(in);
- case LONG:
- return DataLong.read(in);
- case FLOAT:
- return DataFloat.read(in);
- case DOUBLE:
- return DataDouble.read(in);
- case UNKNOWN:
- return DataUnknown.read(in);
- default:
- throw new AssertionError("Invalid data type indicator " + b[0] + " while reading Datum from binary file");
- }
-}
-
- // Encode the integer so that the high bit is set on the last
- // byte
- static void encodeInt(DataOutput os, int i) throws IOException {
- if (i >> 28 != 0)
- os.write((i >> 28) & 0x7f);
- if (i >> 21 != 0)
- os.write((i >> 21) & 0x7f);
- if (i >> 14 != 0)
- os.write((i >> 14) & 0x7f);
- if (i >> 7 != 0)
- os.write((i >> 7) & 0x7f);
- os.write((i & 0x7f) | (1 << 7));
- }
-
- static int decodeInt(DataInput is) throws IOException {
- int i = 0;
- int c;
- while (true) {
- c = is.readUnsignedByte();
- if (c == -1)
- break;
- i <<= 7;
- i += c & 0x7f;
- if ((c & 0x80) != 0)
- break;
- }
- return i;
- }
- */
-
-protected ArrayList<Datum> mFields;
-
+public interface Tuple extends WritableComparable {
+ /**
+ * Make this tuple reference the contents of another. This method does not copy
+ * the underlying data. It maintains references to the data from the original
+ * tuple (and possibly even to the data structure holding the data).
+ * @param t Tuple to reference.
+ */
+ void reference(Tuple t);
+
+ /**
+ * Find the size of the tuple. Used to be called arity().
+ * @return number of fields in the tuple.
+ */
+ int size();
+
+ /**
+ * Find out if a given field is null.
+ * @param fieldNum Number of field to check for null.
+ * @return true if the field is null, false otherwise.
+ * @throws IOException if the field number given is greater
+ * than or equal to the number of fields in the tuple.
+ */
+ boolean isNull(int fieldNum) throws IOException;
+
+ /**
+ * Find the type of a given field.
+ * @param fieldNum Number of field to get the type for.
+ * @return type, encoded as a byte value. The values are taken from
+ * the class DataType. If the field is null, then DataType.UNKNOWN
+ * will be returned.
+ * @throws IOException if the field number is greater than or equal to
+ * the number of fields in the tuple.
+ */
+ byte getType(int fieldNum) throws IOException;
+
+ /**
+ * Get the value in a given field.
+ * @param fieldNum Number of the field to get the value for.
+ * @return value, as an Object.
+ * @throws IOException if the field number is greater than or equal to
+ * the number of fields in the tuple.
+ */
+ Object get(int fieldNum) throws IOException;
+
+ /**
+ * Get all of the fields in the tuple as a list.
+ * @return List<Object> containing the fields of the tuple
+ * in order.
+ */
+ List<Object> getAll();
+
+ /**
+ * Set the value in a given field.
+ * @param fieldNum Number of the field to set the value for.
+ * @param val Object to put in the indicated field.
+ * @throws IOException if the field number is greater than or equal to
+ * the number of fields in the tuple.
+ */
+ void set(int fieldNum, Object val) throws IOException;
+
+ /**
+ * Append a field to a tuple. This method is not efficient as it may
+ * force copying of existing data in order to grow the data structure.
+ * Whenever possible you should construct your Tuple with the
+ * newTuple(int) method and then fill in the values with set(), rather
+ * than construct it with newTuple() and append values.
+ * @param val Object to append to the tuple.
+ */
+ void append(Object val);
+
+ /**
+ * Determine the size of tuple in memory. This is used by data bags
+ * to determine their memory size. This need not be exact, but it
+ * should be a decent estimation.
+ * @return estimated memory size.
+ */
+ long getMemorySize();
+
+ /**
+ * Write a tuple of atomic values into a string. All values in the
+ * tuple must be atomic (no bags, tuples, or maps).
+ * @param delim Delimiter to use in the string.
+ * @return A string containing the tuple.
+ * @throws IOException if a non-atomic value is found.
+ */
+ String toDelimitedString(String delim) throws IOException;
}
Added: incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java?rev=614325&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TupleFactory.java Tue Jan 22 13:17:12 2008
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.lang.ClassLoader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+/**
+ * A factory to construct tuples. This class is abstract so that users can
+ * override the tuple factory if they desire to provide their own that
+ * returns their implementation of a tuple. If the property
+ * pig.data.tuple.factory.name is set to a class name and
+ * pig.data.tuple.factory.jar is set to a URL pointing to a jar that
+ * contains the above named class, then getInstance() will create a
+ * a instance of the named class using the indicatd jar. Otherwise, it
+ * will create and instance of DefaultTupleFactory.
+ */
+public abstract class TupleFactory {
+ private static TupleFactory gSelf = null;
+
+ /**
+ * Get a reference to the singleton factory.
+ */
+ public static TupleFactory getInstance() {
+ if (gSelf == null) {
+ String factoryName =
+ System.getProperty("pig.data.tuple.factory.name");
+ String factoryJar =
+ System.getProperty("pig.data.tuple.factory.jar");
+ if (factoryName != null && factoryJar != null) {
+ try {
+ URL[] urls = new URL[1];
+ urls[0] = new URL(factoryJar);
+ ClassLoader loader = new URLClassLoader(urls,
+ TupleFactory.class.getClassLoader());
+ Class c = Class.forName(factoryName, true, loader);
+ Object o = c.newInstance();
+ if (!(o instanceof TupleFactory)) {
+ throw new RuntimeException("Provided factory " +
+ factoryName + " does not extend TupleFactory!");
+ }
+ gSelf = (TupleFactory)o;
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ // We just threw this
+ RuntimeException re = (RuntimeException)e;
+ throw re;
+ }
+ throw new RuntimeException("Unable to instantiate "
+ + "tuple factory " + factoryName, e);
+ }
+ } else {
+ gSelf = new DefaultTupleFactory();
+ }
+ }
+ return gSelf;
+ }
+
+ /**
+ * Create an empty tuple. This should be used as infrequently as
+ * possible, use newTuple(int) instead.
+ */
+ public abstract Tuple newTuple();
+
+ /**
+ * Create a tuple with size fields. Whenever possible this is prefered
+ * over the nullary constructor, as the constructor can preallocate the
+ * size of the container holding the fields.
+ * @param size Number of fields in the tuple.
+ */
+ public abstract Tuple newTuple(int size);
+
+ /**
+ * Create a tuple from the provided list of objects.
+ * @param c List of objects to use as the fields of the tuple.
+ */
+ public abstract Tuple newTuple(List c);
+
+ /**
+ * Create a tuple with a single element. This is useful because of
+ * the fact that bags (currently) only take tuples, we often end up
+ * sticking a single element in a tuple in order to put it in a bag.
+ * @param datum Datum to put in the tuple.
+ */
+ public abstract Tuple newTuple(Object datum);
+
+ protected TupleFactory() {
+ }
+
+ /**
+ * Provided for testing purposes only. This function should never be
+ * called by anybody but the unit tests.
+ */
+ public static void resetSelf() {
+ gSelf = null;
+ }
+
+}
+
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Tue Jan 22 13:17:12 2008
@@ -19,6 +19,7 @@
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Constructor;
@@ -42,6 +43,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
@@ -75,7 +77,7 @@
transient private JobConf conf = null;
// extra jar files that are needed to run a job
- transient public List<String> extraJars = new LinkedList<String>();
+ transient public List<URL> extraJars = new LinkedList<URL>();
// The jars that should not be merged in. (Some functions may come from pig.jar and we don't want the whole jar file.)
transient public Vector<String> skipJars = new Vector<String>(2);
@@ -89,7 +91,6 @@
// connection to hadoop jobtracker (stays as null if doing local execution)
transient private JobSubmissionProtocol jobTracker;
transient private JobClient jobClient;
- transient private Logger mLogger;
private String jobName = JOB_NAME_PREFIX; // can be overwritten by users
@@ -109,12 +110,6 @@
public PigContext(ExecType execType){
this.execType = execType;
- /*
- mLogger = Logger.getLogger("org.apache.pig");
- mLogger.setAdditivity(false);
- */
- mLogger = PigLogger.getLogger();
-
initProperties();
String pigJar = JarManager.findContainingJar(Main.class);
@@ -134,6 +129,8 @@
}
private void initProperties() {
+ Logger log = PigLogger.getLogger();
+
Properties fileProperties = new Properties();
try{
@@ -158,15 +155,16 @@
//Now set these as system properties only if they are not already defined.
for (Object o: fileProperties.keySet()){
String propertyName = (String)o;
- mLogger.debug("Found system property " + propertyName + " in .pigrc");
+ log.debug("Found system property " + propertyName + " in .pigrc");
if (System.getProperty(propertyName) == null){
System.setProperty(propertyName, fileProperties.getProperty(propertyName));
- mLogger.debug("Setting system property " + propertyName);
+ log.debug("Setting system property " + propertyName);
}
}
}
public void connect(){
+ Logger log = PigLogger.getLogger();
try{
if (execType != ExecType.LOCAL){
//First set the ssh socket factory
@@ -202,10 +200,10 @@
lfs = FileSystem.getNamed("local", conf);
- mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
+ log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
dfs = FileSystem.get(conf);
- mLogger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+ log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf);
jobClient = new JobClient(conf);
@@ -236,6 +234,7 @@
};
private String[] doHod(String server) {
+ Logger log = PigLogger.getLogger();
if (hodMapRed != null) {
return new String[] {hodHDFS, hodMapRed};
}
@@ -253,7 +252,10 @@
cmd.append(System.getProperty("hod.command"));
//String cmd = System.getProperty("hod.command", "/home/breed/startHOD.expect");
String cluster = System.getProperty("yinst.cluster");
- if (cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
+ // TODO This is a Yahoo specific holdover, need to remove
+ // this.
+ if (cluster != null && cluster.length() > 0 &&
+ !cluster.startsWith("kryptonite")) {
cmd.append(" --config=");
cmd.append(System.getProperty("hod.config.dir"));
cmd.append('/');
@@ -267,8 +269,8 @@
p = fac.ssh(cmd.toString());
}
InputStream is = p.getInputStream();
- mLogger.info("Connecting to HOD...");
- mLogger.debug("sending HOD command " + cmd.toString());
+ log.info("Connecting to HOD...");
+ log.debug("sending HOD command " + cmd.toString());
StringBuffer sb = new StringBuffer();
int c;
String hdfsUI = null;
@@ -282,23 +284,23 @@
switch(current) {
case HDFSUI:
hdfsUI = sb.toString().trim();
- mLogger.info("HDFS Web UI: " + hdfsUI);
+ log.info("HDFS Web UI: " + hdfsUI);
break;
case HDFS:
hdfs = sb.toString().trim();
- mLogger.info("HDFS: " + hdfs);
+ log.info("HDFS: " + hdfs);
break;
case MAPREDUI:
mapredUI = sb.toString().trim();
- mLogger.info("JobTracker Web UI: " + mapredUI);
+ log.info("JobTracker Web UI: " + mapredUI);
break;
case MAPRED:
mapred = sb.toString().trim();
- mLogger.info("JobTracker: " + mapred);
+ log.info("JobTracker: " + mapred);
break;
case HADOOPCONF:
hadoopConf = sb.toString().trim();
- mLogger.info("HadoopConf: " + hadoopConf);
+ log.info("HadoopConf: " + hadoopConf);
break;
}
current = ParsingState.NOTHING;
@@ -334,7 +336,7 @@
{
conf = new JobConf(hadoopConf);
// make sure that files on class path are used
- conf.addFinalResource("pig-cluster-hadoop-site.xml");
+ conf.addResource("pig-cluster-hadoop-site.xml");
System.out.println("Job Conf = " + conf);
System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
@@ -344,7 +346,7 @@
throw new IOException("Missing Hadoop configuration file");
return new String[] {hdfs, mapred};
} catch (Exception e) {
- mLogger.fatal("Could not connect to HOD", e);
+ log.fatal("Could not connect to HOD", e);
System.exit(4);
}
throw new RuntimeException("Could not scrape needed information.");
@@ -372,9 +374,18 @@
}
}
- public void addJar(String path) throws MalformedURLException{
- extraJars.add(path);
- LogicalPlanBuilder.classloader = createCl(null);
+ public void addJar(String path) throws MalformedURLException {
+ if (path != null) {
+ URL resource = (new File(path)).toURI().toURL();
+ addJar(resource);
+ }
+ }
+
+ public void addJar(URL resource) throws MalformedURLException{
+ if (resource != null) {
+ extraJars.add(resource);
+ LogicalPlanBuilder.classloader = createCl(null);
+ }
}
public void rename(String oldName, String newName) throws IOException {
@@ -410,12 +421,6 @@
return conf;
}
- /*
- public Logger getLogger() {
- return mLogger;
- }
- */
-
public void setJobtrackerLocation(String newLocation) {
conf.set("mapred.job.tracker", newLocation);
}
@@ -471,7 +476,7 @@
urls[0] = new URL("file:" + jarFile);
}
for (int i = 0; i < extraJars.size(); i++) {
- urls[i + passedJar] = new URL("file:" + extraJars.get(i));
+ urls[i + passedJar] = extraJars.get(i);
}
return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
}
@@ -506,7 +511,9 @@
// create ClassNotFoundException exception and attach to IOException
// so that we don't need to buble interface changes throughout the code
ClassNotFoundException e = new ClassNotFoundException("Could not resolve " + name + " using imports: " + packageImportList);
- throw new IOException(e);
+ IOException newE = new IOException(e.getMessage());
+ newE.initCause(e);
+ throw newE;
}
private static List<String> parseArguments(String argString){
@@ -548,7 +555,9 @@
ret = objClass.newInstance();
}
}catch(Throwable e){
- throw new IOException(e);
+ IOException newE = new IOException(e.getMessage());
+ newE.initCause(e);
+ throw newE;
}
return ret;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ADD.java Tue Jan 22 13:17:12 2008
@@ -20,20 +20,19 @@
import java.io.IOException;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-public class ADD extends EvalFunc<DataAtom> {
+public class ADD extends EvalFunc<Double> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- double v1 = input.getAtomField(0).numval();
- double v2 = input.getAtomField(1).numval();
-
- output.setValue(v1+v2);
+ public Double exec(Tuple input) throws IOException {
+ double v1 = (Double)input.get(0);
+ double v2 = (Double)input.get(1);
+
+ return new Double(v1 + v2);
}
@Override
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/DIVIDE.java Tue Jan 22 13:17:12 2008
@@ -20,17 +20,16 @@
import java.io.IOException;
import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
import org.apache.pig.data.Tuple;
-public class DIVIDE extends EvalFunc<DataAtom> {
+public class DIVIDE extends EvalFunc<Double> {
@Override
- public void exec(Tuple input, DataAtom output) throws IOException {
- double v1 = input.getAtomField(0).numval();
- double v2 = input.getAtomField(1).numval();
- output.setValue(v1/v2);
+ public Double exec(Tuple input) throws IOException {
+ double v1 = (Double)input.get(0);
+ double v2 = (Double)input.get(1);
+ return new Double(v1/v2);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Tue Jan 22 13:17:12 2008
@@ -21,36 +21,39 @@
import java.util.Iterator;
import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.Datum;
public class FindQuantiles extends EvalFunc<DataBag>{
-
- /**
- * first field in the input tuple is the number of quantiles to generate
- * second field is the *sorted* bag of samples
- */
-
- @Override
- public void exec(Tuple input, DataBag output) throws IOException {
- int numQuantiles = input.getAtomField(0).numval().intValue();
- DataBag samples = input.getBagField(1);
-
- int numSamples = samples.cardinality();
-
- int toSkip = numSamples / numQuantiles;
-
- int i=0, nextQuantile = 0;
- Iterator<Datum> iter = samples.content();
- while (iter.hasNext()){
- Tuple t = (Tuple)iter.next();
- if (i==nextQuantile){
- output.add(t);
- nextQuantile+=toSkip+1;
- }
- i++;
- }
- }
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ /**
+ * first field in the input tuple is the number of quantiles to generate
+ * second field is the *sorted* bag of samples
+ */
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ Integer numQuantiles = (Integer)input.get(0);
+ DataBag samples = (DataBag)input.get(1);
+ DataBag output = mBagFactory.newDefaultBag();
+
+ long numSamples = samples.size();
+
+ long toSkip = numSamples / numQuantiles;
+
+ long i=0, nextQuantile = 0;
+ Iterator<Tuple> iter = samples.iterator();
+ while (iter.hasNext()){
+ Tuple t = iter.next();
+ if (i==nextQuantile){
+ output.add(t);
+ nextQuantile+=toSkip+1;
+ }
+ i++;
+ }
+ return output;
+ }
}