You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/11/12 19:33:18 UTC
svn commit: r835487 [3/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengin...
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the min of the Long values in the first field of a tuple.
*/
-public class LongMin extends EvalFunc<Long> implements Algebraic {
+public class LongMin extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation */
+ private Long intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Long.MAX_VALUE;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateMin;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the sum of the Long values in the first field of a tuple.
*/
-public class LongSum extends EvalFunc<Long> implements Algebraic {
+public class LongSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> {
@Override
public Long exec(Tuple input) throws IOException {
@@ -155,5 +156,35 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.LONG));
}
+
+ /* Accumulator interface implementation*/
+ private Long intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Long curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Long getValue() {
+ return intermediateSum;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java Thu Nov 12 18:33:15 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class MAX extends EvalFunc<Double> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -216,5 +217,39 @@
funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+ private Double intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMax == null) {
+ intermediateMax = Double.NEGATIVE_INFINITY;
+ }
+ intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMax;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java Thu Nov 12 18:33:15 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
/**
* Generates the min of the values of the first field of a tuple.
*/
-public class MIN extends EvalFunc<Double> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -217,5 +218,39 @@
funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
return funcList;
+ }
+
+ /* Accumulator interface implementation */
+ private Double intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ /* if bag is not null, initialize intermediateMax to negative infinity */
+ if (intermediateMin == null) {
+ intermediateMin = Double.POSITIVE_INFINITY;
+ }
+ intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing min in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateMin;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Thu Nov 12 18:33:15 2009
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -39,7 +40,7 @@
/**
* Generates the sum of the values of the first field of a tuple.
*/
-public class SUM extends EvalFunc<Double> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
@Override
public Double exec(Tuple input) throws IOException {
@@ -222,6 +223,36 @@
funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
return funcList;
+ }
+
+ /* Accumulator interface implementation*/
+ private Double intermediateSum = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ Double curSum = sum(b);
+ if (curSum == null) {
+ return;
+ }
+ intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing sum in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateSum = null;
+ }
+
+ @Override
+ public Double getValue() {
+ return intermediateSum;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
/**
* Generates the max of the values of the first field of a tuple.
*/
-public class StringMax extends EvalFunc<String> implements Algebraic {
+public class StringMax extends EvalFunc<String> implements Algebraic, Accumulator<String> {
@Override
public String exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
}
+
+
+ /* accumulator interface */
+ private String intermediateMax = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ String curMax = max(b);
+ if (curMax == null) {
+ return;
+ }
+ // check if it lexicographically follows curMax
+ if (intermediateMax == null || intermediateMax.compareTo(curMax) > 0) {
+ intermediateMax = curMax;
+ }
+
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMax = null;
+ }
+
+ @Override
+ public String getValue() {
+ return intermediateMax;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java Thu Nov 12 18:33:15 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
/**
* Generates the min of the String values in the first field of a tuple.
*/
-public class StringMin extends EvalFunc<String> implements Algebraic {
+public class StringMin extends EvalFunc<String> implements Algebraic, Accumulator<String> {
@Override
public String exec(Tuple input) throws IOException {
@@ -154,5 +155,39 @@
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
}
+
+ /* accumulator interface */
+ private String intermediateMin = null;
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ String curMin = min(b);
+ if (curMin == null) {
+ return;
+ }
+ // check if it lexicographically follows curMax
+ if (intermediateMin == null || intermediateMin.compareTo(curMin) < 0) {
+ intermediateMin = curMin;
+ }
+
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing max in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediateMin = null;
+ }
+
+ @Override
+ public String getValue() {
+ return intermediateMin;
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+
+public class AccumulativeBag implements DataBag {
+ private static final long serialVersionUID = 1L;
+
+ private transient AccumulativeTupleBuffer buffer;
+ private int index;
+
+ public AccumulativeBag(AccumulativeTupleBuffer buffer, int index) {
+ this.buffer = buffer;
+ this.index = index;
+ }
+
+ public void add(Tuple t) {
+ throw new RuntimeException("AccumulativeBag does not support add operation");
+ }
+
+ public void addAll(DataBag b) {
+ throw new RuntimeException("AccumulativeBag does not support add operation");
+ }
+
+ public void clear() {
+ throw new RuntimeException("AccumulativeBag does not support clear operation");
+ }
+
+ public boolean isDistinct() {
+ return false;
+ }
+
+ public boolean isSorted() {
+ return false;
+ }
+
+ public AccumulativeTupleBuffer getTuplebuffer() {
+ return buffer;
+ }
+
+ public Iterator<Tuple> iterator() {
+ return buffer.getTuples(index);
+ }
+
+ public void markStale(boolean stale) {
+
+ }
+
+ public long size() {
+ throw new RuntimeException("AccumulativeBag does not support size() operation");
+ }
+
+ public long getMemorySize() {
+ return 0;
+ }
+
+ public long spill() {
+ return 0;
+ }
+
+ public void readFields(DataInput datainput) throws IOException {
+ throw new IOException("AccumulativeBag does not support readFields operation");
+ }
+
+ public void write(DataOutput dataoutput) throws IOException {
+ throw new IOException("AccumulativeBag does not support write operation");
+ }
+
+ public int compareTo(Object other) {
+ throw new RuntimeException("AccumulativeBag does not support compareTo() operation");
+ }
+
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42;
+ }
+
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestAccumulator extends TestCase{
+ private static final String INPUT_FILE = "AccumulatorInput.txt";
+ private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
+ private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestAccumulator() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ // pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
+ pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ createFiles();
+ }
+
+ private void createFiles() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+
+ w.println("100\tapple");
+ w.println("200\torange");
+ w.println("300\tstrawberry");
+ w.println("300\tpear");
+ w.println("100\tapple");
+ w.println("300\tpear");
+ w.println("400\tapple");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ w = new PrintWriter(new FileWriter(INPUT_FILE2));
+
+ w.println("100\t");
+ w.println("100\t");
+ w.println("200\t");
+ w.println("200\t");
+ w.println("300\tstrawberry");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+
+ w = new PrintWriter(new FileWriter(INPUT_FILE3));
+
+ w.println("100\t1.0");
+ w.println("100\t2.0");
+ w.println("200\t1.1");
+ w.println("200\t2.1");
+ w.println("100\t3.0");
+ w.println("100\t4.0");
+ w.println("200\t3.1");
+ w.println("100\t5.0");
+ w.println("300\t3.3");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE);
+ new File(INPUT_FILE2).delete();
+ Util.deleteFile(cluster, INPUT_FILE2);
+ new File(INPUT_FILE3).delete();
+ Util.deleteFile(cluster, INPUT_FILE3);
+ }
+
+
+ public void testAccumBasic() throws IOException{
+ // test group by
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 2);
+ expected.put(200, 1);
+ expected.put(300, 3);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");
+
+ try{
+ iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ fail("accumulator should not be called.");
+ }catch(IOException e) {
+ // should throw exception from AccumulatorBagCount.
+ }
+
+ // test cogroup
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("C = cogroup A by id, B by id;");
+ pigServer.registerQuery("D = foreach C generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");
+
+ HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
+ expected2.put(100, "2,2");
+ expected2.put(200, "1,1");
+ expected2.put(300, "3,3");
+ expected2.put(400, "1,1");
+
+
+ iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());
+ }
+ }
+
+ public void testAccumWithNegative() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, -2);
+ expected.put(200, -1);
+ expected.put(300, -3);
+ expected.put(400, -1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithAdd() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");
+
+ {
+ HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+ expected.put(100, 3.0);
+ expected.put(200, 2.0);
+ expected.put(300, 4.0);
+ expected.put(400, 2.0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
+ }
+ }
+
+ {
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
+ expected.put(100, 4);
+ expected.put(200, 2);
+ expected.put(300, 6);
+ expected.put(400, 2);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+ }
+
+ public void testAccumWithMinus() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+ expected.put(100, 4.0);
+ expected.put(200, 2.0);
+ expected.put(300, 6.0);
+ expected.put(400, 2.0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
+ }
+ }
+
+ public void testAccumWithMod() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 1);
+ expected.put(300, 1);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithDivide() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithAnd() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 1);
+ expected.put(300, 1);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithOr() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithRegexp() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+
+ public void testAccumWithIsNull() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 0);
+ expected.put(300, 1);
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithDistinct() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 2);
+ expected.put(200, 2);
+ expected.put(300, 3);
+ expected.put(400, 2);
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithSort() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("B = foreach A generate id, f, id as t;");
+ pigServer.registerQuery("C = group B by id;");
+ pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
+
+ HashMap<Integer, String> expected = new HashMap<Integer, String>();
+ expected.put(100, "(apple)(apple)");
+ expected.put(200, "(orange)");
+ expected.put(300, "(pear)(pear)(strawberry)");
+ expected.put(400, "(apple)");
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
+ }
+ }
+
+ public void testAccumWithBuildin() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
+ pigServer.registerQuery("C = group A by id;");
+ pigServer.registerQuery("D = foreach C generate group, SUM(A.v), AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");
+
+ HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
+ expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0});
+ expected.put(200, new Double[]{6.3,2.1,3.0,1.1,3.1});
+ expected.put(300, new Double[]{3.3,3.3,1.0,3.3,3.3});
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ Double[] v = expected.get((Integer)t.get(0));
+ for(int i=0; i<v.length; i++) {
+ assertEquals(v[i].doubleValue(), ((Number)t.get(i+1)).doubleValue(), 0.0001);
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.Accumulator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This class is for testing of accumulator udfs
+ *
+ */
+public class AccumulativeSumBag extends EvalFunc<String> implements Accumulator<String>
+{
+
+ StringBuffer sb;
+
+ public AccumulativeSumBag() {
+ }
+
+ public void accumulate(Tuple tuple) throws IOException {
+ DataBag databag = (DataBag)tuple.get(0);
+ if(databag == null)
+ return;
+
+ if (sb == null) {
+ sb = new StringBuffer();
+ }
+
+ Iterator<Tuple> iterator = databag.iterator();
+ while(iterator.hasNext()) {
+ Tuple t = iterator.next();
+ if (t.size()>1 && t.get(1) == null) {
+ continue;
+ }
+
+ sb.append(t.toString());
+ }
+ }
+
+ public String getValue() {
+ if (sb != null && sb.length()>0) {
+ return sb.toString();
+ }
+ return null;
+ }
+
+ public void cleanup() {
+ sb = null;
+ }
+
+ public String exec(Tuple tuple) throws IOException {
+ throw new IOException("exec() should not be called");
+ }
+}
+
\ No newline at end of file
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.Accumulator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class AccumulatorBagCount extends EvalFunc<Integer> implements Accumulator<Integer> {
+
+ int count = 0;
+
+ public AccumulatorBagCount() {
+ }
+
+ public void accumulate(Tuple tuple) throws IOException {
+ DataBag databag = (DataBag)tuple.get(0);
+ if(databag == null)
+ return;
+
+ Iterator<Tuple> iterator = databag.iterator();
+ while(iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+ }
+
+ public Integer getValue() {
+ return new Integer(count);
+ }
+
+ public void cleanup() {
+ count = 0;
+ }
+
+ public Integer exec(Tuple tuple) throws IOException {
+ throw new IOException("exec() should not be called.");
+ }
+}
+
\ No newline at end of file
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java Thu Nov 12 18:33:15 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class BagCount extends EvalFunc<Integer> {
+
+
+ public BagCount() {
+ }
+
+
+ public Integer exec(Tuple tuple) throws IOException {
+ DataBag databag = (DataBag)tuple.get(0);
+ if(databag == null) {
+ return new Integer(0);
+ }
+
+ int count = 0;
+
+ Iterator<Tuple> iterator = databag.iterator();
+ while(iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+
+ return new Integer(count);
+ }
+}
+
\ No newline at end of file
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java Thu Nov 12 18:33:15 2009
@@ -17,6 +17,8 @@
*/
package org.apache.pig.test.utils;
+import java.util.List;
+
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.impl.plan.OperatorKey;
@@ -96,7 +98,10 @@
}
return new Result();
}
-
-
+
+ @Override
+ protected List<ExpressionOperator> getChildExpressions() {
+ return null;
+ }
}