You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/11 16:20:56 UTC

git commit: CRUNCH-192 Enforce single use of Reducer Iterables

Updated Branches:
  refs/heads/master 222dd76ac -> cbc7c2fb3


CRUNCH-192 Enforce single use of Reducer Iterables


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbc7c2fb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbc7c2fb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbc7c2fb

Branch: refs/heads/master
Commit: cbc7c2fb30ad0486e7ec60656c079c81e41eda2c
Parents: 222dd76
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 5 22:25:19 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Apr 11 16:17:52 2013 +0200

----------------------------------------------------------------------
 .../apache/crunch/IterableReuseProtectionIT.java   |   89 +++++++++++++++
 .../src/main/java/org/apache/crunch/CombineFn.java |    5 +-
 .../org/apache/crunch/impl/SingleUseIterable.java  |   49 ++++++++
 .../apache/crunch/impl/mem/collect/Shuffler.java   |    3 +-
 .../apache/crunch/impl/mr/run/CrunchReducer.java   |    2 +
 .../apache/crunch/impl/SingleUseIterableTest.java  |   54 +++++++++
 6 files changed, 200 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
new file mode 100644
index 0000000..da487eb
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Verify that calling the iterator method on a Reducer-based Iterable 
+ * is forcefully disallowed.
+ */
+public class IterableReuseProtectionIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  
+  public void checkIteratorReuse(Pipeline pipeline) throws IOException {
+    Iterable<String> values = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .by(IdentityFn.<String>getInstance(), Writables.strings())
+        .groupByKey()
+        .combineValues(new TestIterableReuseFn())
+        .values().materialize();
+    
+    List<String> valueList = Lists.newArrayList(values);
+    Collections.sort(valueList);
+    assertEquals(Lists.newArrayList("a", "b", "c", "e"), valueList);
+  }
+  
+  @Test
+  public void testIteratorReuse_MRPipeline() throws IOException {
+    checkIteratorReuse(new MRPipeline(IterableReuseProtectionIT.class, tmpDir.getDefaultConfiguration()));
+  }
+  
+  @Test
+  public void testIteratorReuse_InMemoryPipeline() throws IOException {
+    checkIteratorReuse(MemPipeline.getInstance());
+  }
+  
+  static class TestIterableReuseFn extends CombineFn<String, String> {
+
+    @Override
+    public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) {
+      StringBuilder combinedBuilder = new StringBuilder();
+      for (String v : input.second()) {
+        combinedBuilder.append(v);
+      }
+      
+      try {
+        input.second().iterator();
+        throw new RuntimeException("Second call to iterator should throw an exception");
+      } catch (IllegalStateException e) {
+        // Expected situation
+      }
+      emitter.emit(Pair.of(input.first(), combinedBuilder.toString()));
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index c42e48f..71e8057 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -38,7 +38,10 @@ import com.google.common.collect.Sets;
  * {@link PGroupedTable}, the function will be applied to the output of the map
  * stage before the data is passed to the reducer, which can improve the runtime
  * of certain classes of jobs.
- * 
+ * <p>
+ * Note that the incoming {@code Iterable} can only be used to create an 
+ * {@code Iterator} once. Calling {@link Iterable#iterator()} method a second
+ * time will throw an {@link IllegalStateException}.
  */
 public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
new file mode 100644
index 0000000..98f982f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import java.util.Iterator;
+
+/**
+ * Wrapper around a Reducer's input Iterable. Ensures that the
+ * {@link #iterator()} method is not called more than once.
+ */
+public class SingleUseIterable<T> implements Iterable<T> {
+
+  private boolean used = false;
+  private Iterable<T> wrappedIterable;
+
+  /**
+   * Instantiate around an Iterable that may only be used once.
+   * 
+   * @param toWrap iterable to wrap
+   */
+  public SingleUseIterable(Iterable<T> toWrap) {
+    this.wrappedIterable = toWrap;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    if (used) {
+      throw new IllegalStateException("iterator() can only be called once on this Iterable");
+    }
+    used = true;
+    return wrappedIterable.iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
index afc04c3..2e8f9eb 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.SingleUseIterable;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -75,7 +76,7 @@ abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> {
   private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> {
     @Override
     public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) {
-      return Pair.of(input.getKey(), (Iterable<V>) input.getValue());
+      return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue()));
     }
   }
   

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
index 12caa86..e5ddbd2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.SingleUseIterable;
 import org.apache.hadoop.mapreduce.Reducer;
 
 public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
@@ -52,6 +53,7 @@ public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
 
   @Override
   protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
+    values = new SingleUseIterable<Object>(values);
     if (debug) {
       try {
         node.processIterable(key, values);

http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
new file mode 100644
index 0000000..811a0a3
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SingleUseIterableTest {
+
+  @Test
+  public void testIterator() {
+    List<Integer> values = Lists.newArrayList(1,2,3);
+    
+    SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+    List<Integer> retrievedValues = Lists.newArrayList(iterable);
+    
+    assertEquals(values, retrievedValues);
+  }
+  
+  @Test(expected=IllegalStateException.class)
+  public void testIterator_MultipleCalls() {
+    List<Integer> values = Lists.newArrayList(1,2,3);
+    
+    SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+    List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+    for (Integer n : iterable) {
+      
+    }
+  }
+
+}