You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/11 16:20:56 UTC
git commit: CRUNCH-192 Enforce single use of Reducer Iterables
Updated Branches:
refs/heads/master 222dd76ac -> cbc7c2fb3
CRUNCH-192 Enforce single use of Reducer Iterables
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbc7c2fb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbc7c2fb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbc7c2fb
Branch: refs/heads/master
Commit: cbc7c2fb30ad0486e7ec60656c079c81e41eda2c
Parents: 222dd76
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 5 22:25:19 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Apr 11 16:17:52 2013 +0200
----------------------------------------------------------------------
.../apache/crunch/IterableReuseProtectionIT.java | 89 +++++++++++++++
.../src/main/java/org/apache/crunch/CombineFn.java | 5 +-
.../org/apache/crunch/impl/SingleUseIterable.java | 49 ++++++++
.../apache/crunch/impl/mem/collect/Shuffler.java | 3 +-
.../apache/crunch/impl/mr/run/CrunchReducer.java | 2 +
.../apache/crunch/impl/SingleUseIterableTest.java | 54 +++++++++
6 files changed, 200 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
new file mode 100644
index 0000000..da487eb
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/IterableReuseProtectionIT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Verify that calling the iterator method on a Reducer-based Iterable
+ * is forcefully disallowed.
+ */
+public class IterableReuseProtectionIT {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+
+ public void checkIteratorReuse(Pipeline pipeline) throws IOException {
+ Iterable<String> values = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+ .by(IdentityFn.<String>getInstance(), Writables.strings())
+ .groupByKey()
+ .combineValues(new TestIterableReuseFn())
+ .values().materialize();
+
+ List<String> valueList = Lists.newArrayList(values);
+ Collections.sort(valueList);
+ assertEquals(Lists.newArrayList("a", "b", "c", "e"), valueList);
+ }
+
+ @Test
+ public void testIteratorReuse_MRPipeline() throws IOException {
+ checkIteratorReuse(new MRPipeline(IterableReuseProtectionIT.class, tmpDir.getDefaultConfiguration()));
+ }
+
+ @Test
+ public void testIteratorReuse_InMemoryPipeline() throws IOException {
+ checkIteratorReuse(MemPipeline.getInstance());
+ }
+
+ static class TestIterableReuseFn extends CombineFn<String, String> {
+
+ @Override
+ public void process(Pair<String, Iterable<String>> input, Emitter<Pair<String, String>> emitter) {
+ StringBuilder combinedBuilder = new StringBuilder();
+ for (String v : input.second()) {
+ combinedBuilder.append(v);
+ }
+
+ try {
+ input.second().iterator();
+ throw new RuntimeException("Second call to iterator should throw an exception");
+ } catch (IllegalStateException e) {
+ // Expected situation
+ }
+ emitter.emit(Pair.of(input.first(), combinedBuilder.toString()));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index c42e48f..71e8057 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -38,7 +38,10 @@ import com.google.common.collect.Sets;
* {@link PGroupedTable}, the function will be applied to the output of the map
* stage before the data is passed to the reducer, which can improve the runtime
* of certain classes of jobs.
- *
+ * <p>
+ * Note that the incoming {@code Iterable} can only be used to create an
+ * {@code Iterator} once. Calling {@link Iterable#iterator()} method a second
+ * time will throw an {@link IllegalStateException}.
*/
public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
new file mode 100644
index 0000000..98f982f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/SingleUseIterable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import java.util.Iterator;
+
+/**
+ * Wrapper around a Reducer's input Iterable. Ensures that the
+ * {@link #iterator()} method is not called more than once.
+ */
+public class SingleUseIterable<T> implements Iterable<T> {
+
+ private boolean used = false;
+ private Iterable<T> wrappedIterable;
+
+ /**
+ * Instantiate around an Iterable that may only be used once.
+ *
+ * @param toWrap iterable to wrap
+ */
+ public SingleUseIterable(Iterable<T> toWrap) {
+ this.wrappedIterable = toWrap;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if (used) {
+ throw new IllegalStateException("iterator() can only be called once on this Iterable");
+ }
+ used = true;
+ return wrappedIterable.iterator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
index afc04c3..2e8f9eb 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.SingleUseIterable;
import org.apache.crunch.types.PType;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.ReflectionUtils;
@@ -75,7 +76,7 @@ abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> {
private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> {
@Override
public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) {
- return Pair.of(input.getKey(), (Iterable<V>) input.getValue());
+ return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
index 12caa86..e5ddbd2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.SingleUseIterable;
import org.apache.hadoop.mapreduce.Reducer;
public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
@@ -52,6 +53,7 @@ public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
@Override
protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
+ values = new SingleUseIterable<Object>(values);
if (debug) {
try {
node.processIterable(key, values);
http://git-wip-us.apache.org/repos/asf/crunch/blob/cbc7c2fb/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
new file mode 100644
index 0000000..811a0a3
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SingleUseIterableTest {
+
+ @Test
+ public void testIterator() {
+ List<Integer> values = Lists.newArrayList(1,2,3);
+
+ SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+ List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+ assertEquals(values, retrievedValues);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testIterator_MultipleCalls() {
+ List<Integer> values = Lists.newArrayList(1,2,3);
+
+ SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
+
+ List<Integer> retrievedValues = Lists.newArrayList(iterable);
+
+ for (Integer n : iterable) {
+
+ }
+ }
+
+}