You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/08/07 20:33:09 UTC

git commit: CRUNCH-248: Fix exception masking issue in CrunchReducer caused by SingleUseIterable

Updated Branches:
  refs/heads/master 98458852a -> fd957a920


CRUNCH-248: Fix exception masking issue in CrunchReducer caused by SingleUseIterable


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd957a92
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd957a92
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd957a92

Branch: refs/heads/master
Commit: fd957a9202ab2c72b835d64d9d1b08c3b5d71c85
Parents: 9845885
Author: Josh Wills <jw...@apache.org>
Authored: Wed Aug 7 06:47:03 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Aug 7 06:47:03 2013 -0700

----------------------------------------------------------------------
 .../crunch/SingleUseIterableExceptionIT.java    | 70 ++++++++++++++++++++
 .../src/it/java/org/apache/crunch/TfIdfIT.java  |  1 -
 .../apache/crunch/types/PGroupedTableType.java  | 61 ++++++++++++-----
 3 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
new file mode 100644
index 0000000..ccc91c6
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Iterator;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class SingleUseIterableExceptionIT {
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  static class ReduceFn extends MapFn<Iterable<String>, String> {
+    @Override
+    public String map(Iterable<String> input) {
+      Iterator<String> iter = input.iterator();
+      throw new CrunchRuntimeException("Exception");
+    }
+  }
+  
+  @Test
+  public void testException() throws Exception {
+    run(new MRPipeline(SingleUseIterableExceptionIT.class),
+        tmpDir.copyResourceFileName("shakes.txt"),
+        tmpDir.getFileName("out"));
+  }
+  
+  public static void run(MRPipeline p, String input, String output) {
+    PCollection<String> shakes = p.readTextFile(input);
+    shakes.parallelDo(new MapFn<String, Pair<String, String>>() {
+      @Override
+      public Pair<String, String> map(String input) {
+        if (input.length() > 5) {
+          return Pair.of(input.substring(0, 5), input);
+        } else {
+          return Pair.of("__SHORT__", input);
+        }
+      } 
+    }, Avros.tableOf(Avros.strings(), Avros.strings()))
+    .groupByKey()
+    .mapValues(new ReduceFn(), Avros.strings())
+    .write(To.textFile(output));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
index 23e45ca..640686a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -26,7 +26,6 @@ import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.crunch.fn.MapKeysFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
 import org.apache.crunch.lib.Aggregate;

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd957a92/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index d276cd6..4e72054 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -42,37 +42,66 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
 
   protected static class PTypeIterable<V> implements Iterable<V> {
     private final Iterable<Object> iterable;
-    private final MapFn<Object, V> mapFn;
+    private final HoldLastIterator<V> holdLastIter;
 
     public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
-      this.mapFn = mapFn;
       this.iterable = iterable;
+      this.holdLastIter = new HoldLastIterator<V>(mapFn);
     }
 
     public Iterator<V> iterator() {
-      return new Iterator<V>() {
-        Iterator<Object> iter = iterable.iterator();
+      return holdLastIter.reset(iterable.iterator());
+    }
+    
+    @Override
+    public String toString() {
+      return holdLastIter.toString();
+    }
+  }
+
+  protected static class HoldLastIterator<V> implements Iterator<V> {
 
-        public boolean hasNext() {
-          return iter.hasNext();
-        }
+    private Iterator<Object> iter;
+    private V lastReturned = null;
+    private final MapFn<Object, V> mapFn;
+    
+    public HoldLastIterator(MapFn<Object, V> mapFn) {
+      this.mapFn = mapFn;
+    }
+    
+    public HoldLastIterator<V> reset(Iterator<Object> iter) {
+      this.iter = iter;
+      return this;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
 
-        public V next() {
-          return mapFn.map(iter.next());
-        }
+    @Override
+    public V next() {
+      lastReturned = mapFn.map(iter.next());
+      return lastReturned;
+    }
 
-        public void remove() {
-          iter.remove();
-        }
-      };
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
     }
     
     @Override
     public String toString() {
-      return Iterables.toString(this);
+      StringBuilder sb = new StringBuilder().append('[');
+      if (lastReturned != null) {
+        sb.append(lastReturned).append(", ...]");
+      } else if (iter != null) {
+        sb.append("...]");
+      }
+      return sb.toString();
     }
   }
-
+  
   public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
     private final MapFn<Object, K> keys;
     private final MapFn<Object, V> values;