You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2014/03/29 17:30:57 UTC

[1/3] git commit: CRUNCH-364: Fix failure on mvn dependency:tree

Repository: crunch
Updated Branches:
  refs/heads/master d39923de3 -> d4917a217


CRUNCH-364: Fix failure on mvn dependency:tree


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e03cd2b9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e03cd2b9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e03cd2b9

Branch: refs/heads/master
Commit: e03cd2b9ca5d779310be529f0f0023380f0f6cc2
Parents: eac45b0
Author: Chao Shi <ch...@apache.org>
Authored: Wed Mar 12 23:42:35 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Wed Mar 12 23:45:00 2014 +0800

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/e03cd2b9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8812f5e..c260016 100644
--- a/pom.xml
+++ b/pom.xml
@@ -657,7 +657,7 @@ under the License.
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-dependency-plugin</artifactId>
-          <version>2.5.1</version>
+          <version>2.8</version>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>


[3/3] git commit: CRUNCH-351: Impove performance of Shard#shard().

Posted by ch...@apache.org.
CRUNCH-351: Impove performance of Shard#shard().


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d4917a21
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d4917a21
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d4917a21

Branch: refs/heads/master
Commit: d4917a217f7db21b9489299e39cd7f28428be9af
Parents: c53bc78
Author: Chao Shi <ch...@apache.org>
Authored: Sat Feb 22 18:22:08 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sun Mar 30 00:18:38 2014 +0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/lib/ShardIT.java  | 53 ++++++++++++++++++++
 .../main/java/org/apache/crunch/lib/Shard.java  | 32 +++++-------
 2 files changed, 66 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d4917a21/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java
new file mode 100644
index 0000000..248c260
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/ShardIT.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultiset;
+import org.apache.commons.io.FileUtils;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardIT {
+
+  @Rule
+  public TemporaryPath tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir");
+
+  @Test
+  public void testShard() throws Exception {
+    File inDir = tempDir.getFile("in");
+    FileUtils.writeLines(new File(inDir, "part1"), ImmutableList.of("part1", "part1"));
+    FileUtils.writeLines(new File(inDir, "part2"), ImmutableList.of("part2"));
+    Pipeline pipeline = new MRPipeline(ShardIT.class);
+    PCollection<String> in = pipeline.read(From.textFile(inDir.getPath()));
+    // We can only test on 1 shard here, as local MR does not support multiple reducers.
+    PCollection<String> out = Shard.shard(in, 1);
+    assertEquals(
+        ImmutableMultiset.copyOf(out.materialize()),
+        ImmutableMultiset.of("part1", "part1", "part2"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/d4917a21/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
index 07ba0db..aab791b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java
@@ -17,11 +17,8 @@
  */
 package org.apache.crunch.lib;
 
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
 
 /**
  * Utilities for controlling how the data in a {@code PCollection} is balanced across reducers
@@ -39,27 +36,24 @@ public class Shard {
    * @return A rebalanced {@code PCollection<T>} with the same contents as the input
    */
   public static <T> PCollection<T> shard(PCollection<T> pc, int numPartitions) {
-    PType<T> pt = pc.getPType();
-    return Aggregate.count(pc, numPartitions).parallelDo("shards", new ShardFn<T>(pt), pt);
+    return pc.by(new ShardFn<T>(), pc.getTypeFamily().ints())
+        .groupByKey(numPartitions)
+        .ungroup()
+        .values();
   }
   
-  private static class ShardFn<T> extends DoFn<Pair<T, Long>, T> {
-    private final PType<T> ptype;
-    
-    public ShardFn(PType<T> ptype) {
-      this.ptype = ptype;
-    }
-    
+  private static class ShardFn<T> extends MapFn<T, Integer> {
+
+    private int count;
+
     @Override
     public void initialize() {
-      ptype.initialize(getConfiguration());
+      count = 0;
     }
-    
+
     @Override
-    public void process(Pair<T, Long> input, Emitter<T> emitter) {
-      for (int i = 0; i < input.second(); i++) {
-        emitter.emit(ptype.getDetachedValue(input.first()));
-      }
+    public Integer map(T input) {
+      return count++;
     }
   }
 }


[2/3] git commit: Merge branch 'crunch-364'

Posted by ch...@apache.org.
Merge branch 'crunch-364'


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c53bc785
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c53bc785
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c53bc785

Branch: refs/heads/master
Commit: c53bc7857cecec70e1c9c573a501250adb751f3d
Parents: d39923d e03cd2b
Author: Chao Shi <ch...@apache.org>
Authored: Sun Mar 30 00:17:32 2014 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Sun Mar 30 00:17:32 2014 +0800

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/c53bc785/pom.xml
----------------------------------------------------------------------