You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/12/10 07:30:24 UTC

crunch git commit: CRUNCH-580: Use thread pools in org.apache.crunch.io.impl.FileTargetImpl#handleOutputs for file renaming.

Repository: crunch
Updated Branches:
  refs/heads/master 86ecd82d9 -> 46b33437a


CRUNCH-580: Use thread pools in org.apache.crunch.io.impl.FileTargetImpl#handleOutputs for file renaming.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/46b33437
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/46b33437
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/46b33437

Branch: refs/heads/master
Commit: 46b33437ad19fea88446ceb18ed34e4f3f88a7eb
Parents: 86ecd82
Author: Jeff Quinn <je...@nuna.com>
Authored: Wed Nov 25 10:11:04 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 9 20:45:36 2015 -0800

----------------------------------------------------------------------
 .../crunch/impl/mr/run/RuntimeParameters.java   |  2 +
 .../apache/crunch/io/impl/FileTargetImpl.java   | 71 ++++++++++++++++++--
 .../crunch/io/impl/FileTargetImplTest.java      | 62 +++++++++++++++++
 3 files changed, 129 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 07abf11..fe6f7ee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -43,6 +43,8 @@ public final class RuntimeParameters {
 
   public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs";
 
+  public static final String FILE_TARGET_MAX_THREADS = "crunch.file.target.max.threads";
+
   // Not instantiated
   private RuntimeParameters() {
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index e87485d..5f4cfbb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,16 +18,28 @@
 package org.apache.crunch.io.impl;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.FormatBundle;
@@ -118,6 +130,35 @@ public class FileTargetImpl implements PathTarget {
     return ptype.getConverter();
   }
 
+  private class WorkingPathFileMover implements Callable<Boolean> {
+    private Configuration conf;
+    private Path src;
+    private Path dst;
+    private FileSystem srcFs;
+    private FileSystem dstFs;
+    private boolean sameFs;
+
+
+    public WorkingPathFileMover(Configuration conf, Path src, Path dst,
+                                FileSystem srcFs, FileSystem dstFs, boolean sameFs) {
+      this.conf = conf;
+      this.src = src;
+      this.dst = dst;
+      this.srcFs = srcFs;
+      this.dstFs = dstFs;
+      this.sameFs = sameFs;
+    }
+
+    @Override
+    public Boolean call() throws IOException {
+      if (sameFs) {
+        return srcFs.rename(src, dst);
+      } else {
+        return FileUtil.copy(srcFs, src, dstFs, dst, true, true, conf);
+      }
+    }
+  }
+
   @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
@@ -128,15 +169,33 @@ public class FileTargetImpl implements PathTarget {
       dstFs.mkdirs(path);
     }
     boolean sameFs = isCompatible(srcFs, path);
+    List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
+    ListeningExecutorService executorService =
+        MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(
+                conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1)));
     for (Path s : srcs) {
       Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
-      if (sameFs) {
-        srcFs.rename(s, d);
-      } else {
-        FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
-      }
+      renameFutures.add(
+          executorService.submit(
+              new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
+    }
+    LOG.debug("Renaming " + renameFutures.size() + " files.");
+
+    ListenableFuture<List<Boolean>> future =
+        Futures.successfulAsList(renameFutures);
+    List<Boolean> renameResults = null;
+    try {
+      renameResults = future.get();
+    } catch (InterruptedException | ExecutionException e) {
+      Throwables.propagate(e);
+    } finally {
+      executorService.shutdownNow();
+    }
+    if (renameResults != null && !renameResults.contains(false)) {
+      dstFs.create(getSuccessIndicator(), true).close();
+      LOG.debug("Renamed " + renameFutures.size() + " files.");
     }
-    dstFs.create(getSuccessIndicator(), true).close();
   }
   
   protected Path getSuccessIndicator() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
new file mode 100644
index 0000000..6bc13d2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileTargetImplTest {
+
+  @Rule
+  public TemporaryFolder TMP = new TemporaryFolder();
+
+  @Test
+  public void testHandleOutputsMovesFilesToDestination() throws Exception {
+    java.nio.file.Path testWorkingPath = TMP.newFolder().toPath();
+    java.nio.file.Path testDestinationPath = TMP.newFolder().toPath();
+    FileTargetImpl fileTarget = new FileTargetImpl(
+        new Path(testDestinationPath.toAbsolutePath().toString()),
+        SequenceFileOutputFormat.class,
+        SequentialFileNamingScheme.getInstance());
+
+    File testPart1 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00000");
+    File testPart2 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00001");
+    FileUtils.writeStringToFile(testPart1, "test1");
+    FileUtils.writeStringToFile(testPart2, "test2");
+    fileTarget.handleOutputs(new Configuration(),
+        new Path(testWorkingPath.toAbsolutePath().toString()),
+        -1);
+
+    assertEquals(FileUtils.readFileToString(
+            new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00000")),
+        "test1");
+    assertEquals(FileUtils.readFileToString(
+            new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00001")),
+        "test2");
+  }
+}
\ No newline at end of file