You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/31 00:07:11 UTC
svn commit: r1163430 - in /pig/trunk:
src/org/apache/pig/builtin/CubeDimensions.java
test/org/apache/pig/test/TestCubeDimensions.java
Author: dvryaboy
Date: Tue Aug 30 22:07:11 2011
New Revision: 1163430
URL: http://svn.apache.org/viewvc?rev=1163430&view=rev
Log:
actually adding files from PIG-2168
Added:
pig/trunk/src/org/apache/pig/builtin/CubeDimensions.java
pig/trunk/test/org/apache/pig/test/TestCubeDimensions.java
Added: pig/trunk/src/org/apache/pig/builtin/CubeDimensions.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/CubeDimensions.java?rev=1163430&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/CubeDimensions.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/CubeDimensions.java Tue Aug 30 22:07:11 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Produces a DataBag with all combinations of the argument tuple members
+ * as in a data cube. Meaning, (a, b, c) will produce the following bag:
+ * <pre>
+ * { (a, b, c), (null, null, null), (a, b, null), (a, null, c),
+ * (a, null, null), (null, b, c), (null, null, c), (null, b, null) }
+ * </pre>
+ * <p>
+ * The "all" marker is null by default, but can be set to an arbitrary string by
+ * invoking a constructor (via a DEFINE). The constructor takes a single argument,
+ * the string you want to represent "all".
+ * <p>
+ * Usage goes something like this:
+ * <pre>{@code
+ * events = load '/logs/events' using EventLoader() as (lang, event, app_id);
+ * cubed = foreach x generate
+ * FLATTEN(piggybank.CubeDimensions(lang, event, app_id))
+ * as (lang, event, app_id),
+ * measure;
+ * cube = foreach (group cubed
+ * by (lang, event, app_id) parallel $P)
+ * generate
+ * flatten(group) as (lang, event, app_id),
+ * COUNT_STAR(cubed),
+ * SUM(measure);
+ * store cube into 'event_cube';
+ * }</pre>
+ * <p>
+ * <b>Note</b>: doing this with non-algebraic aggregations on large data can result
+ * in very slow reducers, since one of the groups is going to get <i>all</i> the
+ * records in your relation.
+ */
+public class CubeDimensions extends EvalFunc<DataBag> {
+
+ private static BagFactory bf = BagFactory.getInstance();
+ private static TupleFactory tf = TupleFactory.getInstance();
+ private final String allMarker;
+
+ public CubeDimensions() {
+ this(null);
+ }
+ public CubeDimensions(String allMarker) {
+ super();
+ this.allMarker = allMarker;
+ }
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException {
+ List<Tuple> result = Lists.newArrayListWithCapacity((int) Math.pow(2, tuple.size()));
+ Tuple newt = tf.newTuple(tuple.size());
+ recursivelyCube(result, tuple, 0, newt);
+ return bf.newDefaultBag(result);
+ }
+
+ private void recursivelyCube(List<Tuple> result, Tuple input, int index, Tuple newt) throws ExecException {
+ newt.set(index, input.get(index));
+ if (index == input.size() - 1 ) {
+ result.add(newt);
+ } else {
+ recursivelyCube(result, input, index + 1, newt);
+ }
+ // tf.newTuple makes a copy. tf.newTupleNoCopy doesn't.
+ Tuple newnewt = tf.newTuple(newt.getAll());
+ newnewt.set(index, allMarker);
+ if (index == input.size() - 1) {
+ result.add(newnewt);
+ } else {
+ recursivelyCube(result, input, index + 1, newnewt);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ return new Schema(new FieldSchema("dimensions", input, DataType.BAG));
+ } catch (FrontendException e) {
+ // we are specifying BAG explicitly, so this should not happen.
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: pig/trunk/test/org/apache/pig/test/TestCubeDimensions.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCubeDimensions.java?rev=1163430&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCubeDimensions.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestCubeDimensions.java Tue Aug 30 22:07:11 2011
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.pig.builtin.CubeDimensions;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class TestCubeDimensions {
+
+ private static TupleFactory TF = TupleFactory.getInstance();
+
+ @Test
+ public void testCube() throws IOException {
+ Tuple t = TF.newTuple(ImmutableList.of("a", "b", "c"));
+ Set<Tuple> expected = ImmutableSet.of(
+ TF.newTuple(ImmutableList.of("a", "b", "c")),
+ TF.newTuple(ImmutableList.of("a", "b", "ALL")),
+ TF.newTuple(ImmutableList.of("a", "ALL", "c")),
+ TF.newTuple(ImmutableList.of("ALL", "b", "c")),
+ TF.newTuple(ImmutableList.of("ALL", "ALL", "c")),
+ TF.newTuple(ImmutableList.of("a", "ALL", "ALL")),
+ TF.newTuple(ImmutableList.of("ALL", "ALL", "ALL")),
+ TF.newTuple(ImmutableList.of("ALL", "b", "ALL"))
+ );
+
+ CubeDimensions cd = new CubeDimensions("ALL");
+ DataBag bag = cd.exec(t);
+ assertEquals(bag.size(), expected.size());
+
+ for (Tuple tup : bag) {
+ assertTrue(expected.contains(tup));
+ }
+ }
+}