You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [20/23] - in /hadoop/pig/branches/load-store-redesign:
./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebr...
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Tue Nov 24 19:54:19 2009
@@ -50,7 +50,7 @@
protected static final long serialVersionUID = 1L;
- protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
protected static BagFactory mBagFactory = BagFactory.getInstance();
protected List<PhysicalPlan> plans;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Tue Nov 24 19:54:19 2009
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
@@ -42,6 +44,10 @@
private static final long serialVersionUID = 1L;
+ private boolean[] mInnerFlags;
+
+ // The schema is used only by the MRCompiler to support outer join
+ transient private List<Schema> inputSchema = new ArrayList<Schema>();
transient private static Log log = LogFactory.getLog(POSkewedJoin.class);
@@ -51,19 +57,30 @@
private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans;
public POSkewedJoin(OperatorKey k) {
- this(k,-1,null);
+ this(k,-1,null, null);
}
public POSkewedJoin(OperatorKey k, int rp) {
- this(k, rp, null);
+ this(k, rp, null, null);
}
- public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) {
- this(k, -1, inp);
+ public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean []flags) {
+ this(k, -1, inp, flags);
}
- public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) {
- super(k,rp,inp);
+ public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, boolean []flags) {
+ super(k,rp,inp);
+ if (flags != null) {
+ // copy the inner flags
+ mInnerFlags = new boolean[flags.length];
+ for (int i = 0; i < flags.length; i++) {
+ mInnerFlags[i] = flags[i];
+ }
+ }
+ }
+
+ public boolean[] getInnerFlags() {
+ return mInnerFlags;
}
public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() {
@@ -93,5 +110,13 @@
public boolean supportsMultipleOutputs() {
return false;
}
+
+ public void addSchema(Schema s) {
+ inputSchema.add(s);
+ }
+
+ public Schema getSchema(int i) {
+ return inputSchema.get(i);
+ }
}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSortedDistinct.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * This operator is a variation of PODistinct, the input to this operator
+ * must be sorted already.
+ *
+ */
+public class POSortedDistinct extends PODistinct {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private transient Tuple lastTuple;
+
+ public POSortedDistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ public POSortedDistinct(OperatorKey k, int rp) {
+ super(k, rp);
+ }
+
+ public POSortedDistinct(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ }
+
+ public POSortedDistinct(OperatorKey k) {
+ super(k);
+ }
+
+
+ public Result getNext(Tuple t) throws ExecException {
+ while(true) {
+ Result in = processInput();
+ if (in.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ if (in.returnStatus == POStatus.STATUS_OK) {
+ if (lastTuple == null || !lastTuple.equals(in.result)) {
+ lastTuple = (Tuple)in.result;
+ return in;
+ } else {
+ continue;
+ }
+ }
+
+ if (in.returnStatus == POStatus.STATUS_EOP) {
+ if (!isAccumulative() || !isAccumStarted()) {
+ lastTuple = null;
+ }
+ return in;
+ }
+
+ // if there is an error, just return
+ return in;
+ }
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Tue Nov 24 19:54:19 2009
@@ -95,12 +95,6 @@
Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, Pair<Integer, Integer> >();
- InputStream is;
- if (job != null) {
- is = FileLocalizer.openDFSFile(keyDistFile,ConfigurationUtil.toProperties(job));
- } else {
- is = FileLocalizer.openDFSFile(keyDistFile);
- }
ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), job,
keyDistFile, 0);
DataBag partitionList;
@@ -142,13 +136,7 @@
}
}
// number of reducers
- Integer cnt = 0;
- if (minIndex < maxIndex) {
- cnt = maxIndex - minIndex;
- } else {
- cnt = totalReducers[0] + maxIndex - minIndex;
- }
-
+ Integer cnt = maxIndex - minIndex;
reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
}
return reducerMap;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java Tue Nov 24 19:54:19 2009
@@ -24,9 +24,9 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.Slice;
@@ -61,7 +61,7 @@
/** The connection to the table in Hbase **/
private transient HTable m_table;
/** The scanner over the table **/
- private transient Scanner m_scanner;
+ private transient ResultScanner m_scanner;
private transient ArrayList<Object> mProtoTuple;
@@ -153,7 +153,8 @@
@Override
public void init(DataStorage store) throws IOException {
LOG.info("Init Hbase Slice " + this);
- HBaseConfiguration conf = new HBaseConfiguration();
+
+ HBaseConfiguration conf=new HBaseConfiguration();
// connect to the given table
m_table = new HTable(conf, m_tableName);
// init the scanner
@@ -178,17 +179,18 @@
* @throws IOException
*/
private void restart(byte[] startRow) throws IOException {
+ Scan scan;
if ((m_endRow != null) && (m_endRow.length > 0)) {
- this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow,
- m_endRow);
+ scan = new Scan(startRow, m_endRow);
} else {
- this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow);
+ scan = new Scan(startRow);
}
+ this.m_scanner = this.m_table.getScanner(scan);
}
@Override
public boolean next(Tuple value) throws IOException {
- RowResult result;
+ Result result;
try {
result = this.m_scanner.next();
} catch (UnknownScannerException e) {
@@ -215,15 +217,14 @@
* @param tuple
* tuple
*/
- private void convertResultToTuple(RowResult result, Tuple tuple) {
+ private void convertResultToTuple(Result result, Tuple tuple) {
if (mProtoTuple == null)
mProtoTuple = new ArrayList<Object>();
- Cell cell = null;
byte[] value = null;
for (byte[] column : m_inputColumns) {
- cell = result.get(column);
- if (cell == null || (value = cell.getValue()) == null) {
+ value = result.getValue(column);
+ if (value == null) {
mProtoTuple.add(null);
} else {
mProtoTuple.add(new DataByteArray(value));
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/datastorage/LocalPath.java Tue Nov 24 19:54:19 2009
@@ -26,12 +26,15 @@
import java.util.HashMap;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.datastorage.SeekableInputStream;
public abstract class LocalPath implements ElementDescriptor {
+ private Log log = LogFactory.getLog(getClass());
protected DataStorage fs;
protected File path;
@@ -121,7 +124,9 @@
}
public void delete() throws IOException {
- getCurPath().delete();
+ boolean res = getCurPath().delete();
+ if (!res)
+ log.warn("LocalPath.delete: failed to delete" + getCurPath());
}
public Properties getConfiguration() throws IOException {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Tue Nov 24 19:54:19 2009
@@ -268,10 +268,6 @@
return result;
}
- public boolean equals(Object obj) {
- return this.equals(obj);
- }
-
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/AVG.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -41,7 +42,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global application
*/
-public class AVG extends EvalFunc<Double> implements Algebraic {
+public class AVG extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -178,6 +179,7 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Double d = (Double)t.get(0);
+
// we count nulls in avg as contributing 0
// a departure from SQL for performance of
// COUNT() which implemented by just inspecting
@@ -261,5 +263,53 @@
funcList.add(new FuncSpec(IntAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
funcList.add(new FuncSpec(LongAvg.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Tue Nov 24 19:54:19 2009
@@ -323,6 +323,10 @@
return new BinStorageInputFormat();
}
+ public int hashCode() {
+ return 42;
+ }
+
@Override
public LoadCaster getLoadCaster() {
return this;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinaryStorage.java Tue Nov 24 19:54:19 2009
@@ -146,6 +146,10 @@
return true;
}
+ public int hashCode() {
+ return 42;
+ }
+
@Override
public InputFormat getInputFormat() throws IOException {
return null;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -36,7 +37,7 @@
* Generates the count of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global functions
*/
-public class COUNT extends EvalFunc<Long> implements Algebraic{
+public class COUNT extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@Override
@@ -136,5 +137,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation */
+ private long intermediateCount = 0L;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ DataBag bag = (DataBag)b.get(0);
+ Iterator it = bag.iterator();
+ while (it.hasNext()){
+ Tuple t = (Tuple)it.next();
+ if (t != null && t.size() > 0 && t.get(0) != null) {
+ intermediateCount += 1;
+ }
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateCount = 0L;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateCount;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/COUNT_STAR.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -38,7 +39,7 @@
* implements SQL COUNT(*) semantics. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global functions
*/
-public class COUNT_STAR extends EvalFunc<Long> implements Algebraic{
+public class COUNT_STAR extends EvalFunc<Long> implements Algebraic, Accumulator<Long>{
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@Override
@@ -127,4 +128,31 @@
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+ /* Accumulator interface imlpemenatation */
+
+ private long intermediateCount = 0L;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ intermediateCount += sum(b);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateCount = 0L;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateCount;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleAvg.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implemenation, so if possible the execution will be split into a local and global application
*/
-public class DoubleAvg extends EvalFunc<Double> implements Algebraic {
+public class DoubleAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -233,4 +234,52 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class DoubleMax extends EvalFunc<Double> implements Algebraic {
+public class DoubleMax extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -154,5 +155,40 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Double.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Double values in the first field of a tuple.
*/
-public class DoubleMin extends EvalFunc<Double> implements Algebraic {
+public class DoubleMin extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface implementation */
+ private Double intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Double.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/DoubleSum.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -38,7 +39,7 @@
/**
* Generates the sum of the values of the first field of a tuple.
*/
-public class DoubleSum extends EvalFunc<Double> implements Algebraic {
+public class DoubleSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -160,4 +161,34 @@
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class FloatAvg extends EvalFunc<Double> implements Algebraic {
+public class FloatAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -230,5 +231,53 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Double intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0.0;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class FloatMax extends EvalFunc<Float> implements Algebraic {
+public class FloatMax extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
@Override
public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
}
+
+ /* Accumulator interface */
+
+ private Float intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Float curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Float.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Float getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Float values in the first field of a tuple.
*/
-public class FloatMin extends EvalFunc<Float> implements Algebraic {
+public class FloatMin extends EvalFunc<Float> implements Algebraic, Accumulator<Float> {
@Override
public Float exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
}
+
+ /* Accumulator interface implementation */
+ private Float intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Float curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Float.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Float getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/FloatSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the sum of the Float values in the first field of a tuple.
*/
-public class FloatSum extends EvalFunc<Double> implements Algebraic {
+public class FloatSum extends EvalFunc<Double> implements Algebraic, Accumulator<Double>{
@Override
public Double exec(Tuple input) throws IOException {
@@ -196,5 +197,35 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class IntAvg extends EvalFunc<Double> implements Algebraic {
+public class IntAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -236,4 +237,51 @@
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+ /* Accumulator interface */
+
+ private Long intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0L;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class IntMax extends EvalFunc<Integer> implements Algebraic {
+public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
@Override
public Integer exec(Tuple input) throws IOException {
@@ -153,4 +154,39 @@
return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
}
+ /* Accumulator interface */
+
+ private Integer intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Integer curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Integer.MIN_VALUE;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Integer getValue() {
+ return intermediateMax;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the min of the Integer values in the first field of a tuple.
*/
-public class IntMin extends EvalFunc<Integer> implements Algebraic {
+public class IntMin extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
@Override
public Integer exec(Tuple input) throws IOException {
@@ -154,4 +155,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
}
+
+ /* Accumulator interface implementation */
+ private Integer intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Integer curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Integer.MAX_VALUE;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Integer getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/IntSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -34,7 +35,7 @@
/**
* Generates the sum of the Integer in the first field of a tuple.
*/
-public class IntSum extends EvalFunc<Long> implements Algebraic {
+public class IntSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -198,4 +199,33 @@
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+ /* Accumulator interface implementation*/
+ private Long intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongAvg.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
* Generates the average of the values of the first field of a tuple. This class is Algebraic in
* implementation, so if possible the execution will be split into a local and global application
*/
-public class LongAvg extends EvalFunc<Double> implements Algebraic {
+public class LongAvg extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -230,5 +231,53 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
}
+
+ /* Accumulator interface */
+
+ private Long intermediateSum = null;
+ private Double intermediateCount = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long sum = sum(b);
+ if(sum == null) {
+ return;
+ }
+ // set default values
+ if (intermediateSum == null || intermediateCount == null) {
+ intermediateSum = 0L;
+ intermediateCount = 0.0;
+ }
+
+ double count = (Long)count(b);
+
+ if (count > 0) {
+ intermediateCount += count;
+ intermediateSum += sum;
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ intermediateCount = null;
+ }
+
+ @Override
+ public Double getValue() {
+ Double avg = null;
+ if (intermediateCount > 0) {
+ avg = new Double(intermediateSum / intermediateCount);
+ }
+ return avg;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class LongMax extends EvalFunc<Long> implements Algebraic {
+public class LongMax extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface */
+
+ private Long intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Long.MIN_VALUE;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Long values in the first field of a tuple.
*/
-public class LongMin extends EvalFunc<Long> implements Algebraic {
+public class LongMin extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation */
+ private Long intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Long.MAX_VALUE;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/LongSum.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the sum of the Long values in the first field of a tuple.
*/
-public class LongSum extends EvalFunc<Long> implements Algebraic {
+public class LongSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -155,5 +156,35 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation*/
+ private Long intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MAX.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class MAX extends EvalFunc<Double> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -216,5 +217,39 @@
funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+ private Double intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Double.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMax;
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/MIN.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
/**
* Generates the min of the values of the first field of a tuple.
*/
-public class MIN extends EvalFunc<Double> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -217,5 +218,39 @@
funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+ private Double intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Double.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMin;
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigDump.java Tue Nov 24 19:54:19 2009
@@ -33,7 +33,7 @@
// XXX: FIXME - make this work with new load-store redesign
public class PigDump implements StoreFunc {
- public static String recordDelimiter = "\n";
+ public static final String recordDelimiter = "\n";
OutputStream os;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Tue Nov 24 19:54:19 2009
@@ -291,7 +291,10 @@
@Override
public boolean equals(Object obj) {
- return equals((PigStorage)obj);
+ if (obj instanceof PigStorage)
+ return equals((PigStorage)obj);
+ else
+ return false;
}
public boolean equals(PigStorage other) {
@@ -341,5 +344,8 @@
return LoadFunc.getAbsolutePath(location, curDir);
}
-
+ public int hashCode() {
+ return (int)fieldDel;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/SUM.java Tue Nov 24 19:54:19 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -39,7 +40,7 @@
/**
* Generates the sum of the values of the first field of a tuple.
*/
-public class SUM extends EvalFunc<Double> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -222,6 +223,36 @@
funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
return funcList;
+ }
+
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMax.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class StringMax extends EvalFunc<String> implements Algebraic {
+public class StringMax extends EvalFunc<String> implements Algebraic, Accumulator<String> {
@Override
public String exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
}
+
+
+ /* accumulator interface */
+ private String intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ String curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ // check if it lexicographically follows curMax
+ if (intermediateMax == null || intermediateMax.compareTo(curMax) > 0) {
+ intermediateMax = curMax;
+ }
+
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public String getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/StringMin.java Tue Nov 24 19:54:19 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the min of the String values in the first field of a tuple.
*/
-public class StringMin extends EvalFunc<String> implements Algebraic {
+public class StringMin extends EvalFunc<String> implements Algebraic, Accumulator<String> {
@Override
public String exec(Tuple input) throws IOException {
@@ -154,5 +155,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
}
+
+ /* accumulator interface */
+ private String intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ String curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ // check if it lexicographically follows curMax
+ if (intermediateMin == null || intermediateMin.compareTo(curMin) < 0) {
+ intermediateMin = curMin;
+ }
+
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public String getValue() {
+ return intermediateMin;
+ }
}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/AccumulativeBag.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+
+public class AccumulativeBag implements DataBag {
+ private static final long serialVersionUID = 1L;
+
+ private transient AccumulativeTupleBuffer buffer;
+ private int index;
+
+ public AccumulativeBag(AccumulativeTupleBuffer buffer, int index) {
+ this.buffer = buffer;
+ this.index = index;
+ }
+
+ public void add(Tuple t) {
+ throw new RuntimeException("AccumulativeBag does not support add operation");
+ }
+
+ public void addAll(DataBag b) {
+ throw new RuntimeException("AccumulativeBag does not support add operation");
+ }
+
+ public void clear() {
+ throw new RuntimeException("AccumulativeBag does not support clear operation");
+ }
+
+ public boolean isDistinct() {
+ return false;
+ }
+
+ public boolean isSorted() {
+ return false;
+ }
+
+ public AccumulativeTupleBuffer getTuplebuffer() {
+ return buffer;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return buffer.getTuples(index);
+ }
+
+ public void markStale(boolean stale) {
+
+ }
+
+ public long size() {
+ throw new RuntimeException("AccumulativeBag does not support size() operation");
+ }
+
+ public long getMemorySize() {
+ return 0;
+ }
+
+ public long spill() {
+ return 0;
+ }
+
+ public void readFields(DataInput datainput) throws IOException {
+ throw new IOException("AccumulativeBag does not support readFields operation");
+ }
+
+ public void write(DataOutput dataoutput) throws IOException {
+ throw new IOException("AccumulativeBag does not support write operation");
+ }
+
+ public int compareTo(Object other) {
+ throw new RuntimeException("AccumulativeBag does not support compareTo() operation");
+ }
+
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42;
+ }
+
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java Tue Nov 24 19:54:19 2009
@@ -29,6 +29,7 @@
import java.util.ArrayList;
import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
@@ -161,7 +162,9 @@
mContents.clear();
if (mSpillFiles != null) {
for (int i = 0; i < mSpillFiles.size(); i++) {
- mSpillFiles.get(i).delete();
+ boolean res = mSpillFiles.get(i).delete();
+ if (!res)
+ warn ("DefaultAbstractBag.clear: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
}
mSpillFiles.clear();
}
@@ -298,7 +301,10 @@
protected void finalize() {
if (mSpillFiles != null) {
for (int i = 0; i < mSpillFiles.size(); i++) {
- mSpillFiles.get(i).delete();
+ boolean res = mSpillFiles.get(i).delete();
+ if (!res)
+ warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
+
}
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java Tue Nov 24 19:54:19 2009
@@ -203,6 +203,17 @@
public int compareTo(TContainer other) {
return tuple.compareTo(other.tuple);
}
+
+ public boolean equals(Object obj){
+ if (obj instanceof TContainer)
+ return tuple.equals(((TContainer)obj).tuple);
+ else
+ return false;
+ }
+
+ public int hashCode() {
+ return tuple.hashCode();
+ }
}
// We have to buffer a tuple because there's no easy way for next
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java Tue Nov 24 19:54:19 2009
@@ -83,6 +83,9 @@
return (o == this);
}
+ public int hashCode() {
+ return 42;
+ }
}
public InternalSortedBag() {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/NonSpillableDataBag.java Tue Nov 24 19:54:19 2009
@@ -192,6 +192,10 @@
return compareTo(obj) == 0;
}
+ public int hashCode() {
+ return mContents.hashCode();
+ }
+
@SuppressWarnings("unchecked")
@Override
public int compareTo(Object other) {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SingleTupleBag.java Tue Nov 24 19:54:19 2009
@@ -157,6 +157,15 @@
return 0;
}
+ public boolean equals(Object o){
+ // TODO: match to compareTo if it is updated
+ return true;
+ }
+
+ public int hashCode() {
+ return 42;
+ }
+
class TBIterator implements Iterator<Tuple> {
boolean nextDone = false;
/* (non-Javadoc)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java Tue Nov 24 19:54:19 2009
@@ -71,6 +71,10 @@
return false;
}
+ public int hashCode() {
+ return 42;
+ }
+
}
/**
@@ -177,6 +181,17 @@
public int compareTo(PQContainer other) {
return mComp.compare(tuple, other.tuple);
}
+
+ public boolean equals(Object other) {
+ if (other instanceof PQContainer)
+ return tuple.equals(((PQContainer)other).tuple);
+ else
+ return false;
+ }
+
+ public int hashCode() {
+ return tuple.hashCode();
+ }
}
// We have to buffer a tuple because there's no easy way for next
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/TargetedTuple.java Tue Nov 24 19:54:19 2009
@@ -151,6 +151,15 @@
return t.compareTo(o);
}
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object o) {
+ return t.equals(o);
+ }
+
+ public int hashCode() {
+ return t.hashCode();
+ }
+
/**
* @return true if this Tuple is null
*/
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java Tue Nov 24 19:54:19 2009
@@ -185,15 +185,6 @@
switch (execType) {
case LOCAL:
- {
- lfs = new HDataStorage(URI.create("file:///"),
- new Properties());
-
- dfs = lfs;
- executionEngine = new LocalExecutionEngine(this);
- }
- break;
-
case MAPREDUCE:
{
executionEngine = new HExecutionEngine (this);
@@ -203,7 +194,7 @@
dfs = executionEngine.getDataStorage();
lfs = new HDataStorage(URI.create("file:///"),
- new Properties());
+ new Properties());
}
break;
@@ -331,11 +322,7 @@
}
public DataStorage getFs() {
- if(execType == ExecType.LOCAL) {
- return lfs;
- } else {
- return dfs;
- }
+ return dfs;
}
/**
@@ -573,10 +560,6 @@
switch (execType) {
case LOCAL:
- {
- executableManager = new ExecutableManager();
- }
- break;
case MAPREDUCE:
{
executableManager = new HadoopExecutableManager();
@@ -628,9 +611,7 @@
* @return error source
*/
public byte getErrorSource() {
- if(execType == ExecType.LOCAL) {
- return PigException.USER_ENVIRONMENT;
- } else if (execType == ExecType.MAPREDUCE) {
+ if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
return PigException.REMOTE_ENVIRONMENT;
} else {
return PigException.BUG;