You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/08/07 20:33:09 UTC
git commit: CRUNCH-248: Fix exception masking issue in CrunchReducer
caused by SingleUseIterable
Updated Branches:
refs/heads/master 98458852a -> fd957a920
CRUNCH-248: Fix exception masking issue in CrunchReducer caused by SingleUseIterable
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd957a92
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd957a92
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd957a92
Branch: refs/heads/master
Commit: fd957a9202ab2c72b835d64d9d1b08c3b5d71c85
Parents: 9845885
Author: Josh Wills <jw...@apache.org>
Authored: Wed Aug 7 06:47:03 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Aug 7 06:47:03 2013 -0700
----------------------------------------------------------------------
.../crunch/SingleUseIterableExceptionIT.java | 70 ++++++++++++++++++++
.../src/it/java/org/apache/crunch/TfIdfIT.java | 1 -
.../apache/crunch/types/PGroupedTableType.java | 61 ++++++++++++-----
3 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
new file mode 100644
index 0000000..ccc91c6
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Iterator;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class SingleUseIterableExceptionIT {
+
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ static class ReduceFn extends MapFn<Iterable<String>, String> {
+ @Override
+ public String map(Iterable<String> input) {
+ Iterator<String> iter = input.iterator();
+ throw new CrunchRuntimeException("Exception");
+ }
+ }
+
+ @Test
+ public void testException() throws Exception {
+ run(new MRPipeline(SingleUseIterableExceptionIT.class),
+ tmpDir.copyResourceFileName("shakes.txt"),
+ tmpDir.getFileName("out"));
+ }
+
+ public static void run(MRPipeline p, String input, String output) {
+ PCollection<String> shakes = p.readTextFile(input);
+ shakes.parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ if (input.length() > 5) {
+ return Pair.of(input.substring(0, 5), input);
+ } else {
+ return Pair.of("__SHORT__", input);
+ }
+ }
+ }, Avros.tableOf(Avros.strings(), Avros.strings()))
+ .groupByKey()
+ .mapValues(new ReduceFn(), Avros.strings())
+ .write(To.textFile(output));
+ p.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
index 23e45ca..640686a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -26,7 +26,6 @@ import java.nio.charset.Charset;
import java.util.Collection;
import java.util.List;
-import org.apache.crunch.fn.MapKeysFn;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.seq.SeqFileSourceTarget;
import org.apache.crunch.lib.Aggregate;
http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index d276cd6..4e72054 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -42,37 +42,66 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
protected static class PTypeIterable<V> implements Iterable<V> {
private final Iterable<Object> iterable;
- private final MapFn<Object, V> mapFn;
+ private final HoldLastIterator<V> holdLastIter;
public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
- this.mapFn = mapFn;
this.iterable = iterable;
+ this.holdLastIter = new HoldLastIterator<V>(mapFn);
}
public Iterator<V> iterator() {
- return new Iterator<V>() {
- Iterator<Object> iter = iterable.iterator();
+ return holdLastIter.reset(iterable.iterator());
+ }
+
+ @Override
+ public String toString() {
+ return holdLastIter.toString();
+ }
+ }
+
+ protected static class HoldLastIterator<V> implements Iterator<V> {
- public boolean hasNext() {
- return iter.hasNext();
- }
+ private Iterator<Object> iter;
+ private V lastReturned = null;
+ private final MapFn<Object, V> mapFn;
+
+ public HoldLastIterator(MapFn<Object, V> mapFn) {
+ this.mapFn = mapFn;
+ }
+
+ public HoldLastIterator<V> reset(Iterator<Object> iter) {
+ this.iter = iter;
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
- public V next() {
- return mapFn.map(iter.next());
- }
+ @Override
+ public V next() {
+ lastReturned = mapFn.map(iter.next());
+ return lastReturned;
+ }
- public void remove() {
- iter.remove();
- }
- };
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
}
@Override
public String toString() {
- return Iterables.toString(this);
+ StringBuilder sb = new StringBuilder().append('[');
+ if (lastReturned != null) {
+ sb.append(lastReturned).append(", ...]");
+ } else if (iter != null) {
+ sb.append("...]");
+ }
+ return sb.toString();
}
}
-
+
public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
private final MapFn<Object, K> keys;
private final MapFn<Object, V> values;