You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/12/10 07:30:24 UTC
crunch git commit: CRUNCH-580: Use thread pools in
org.apache.crunch.io.impl.FileTargetImpl#handleOutputs for file renaming.
Repository: crunch
Updated Branches:
refs/heads/master 86ecd82d9 -> 46b33437a
CRUNCH-580: Use thread pools in org.apache.crunch.io.impl.FileTargetImpl#handleOutputs for file renaming.
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/46b33437
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/46b33437
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/46b33437
Branch: refs/heads/master
Commit: 46b33437ad19fea88446ceb18ed34e4f3f88a7eb
Parents: 86ecd82
Author: Jeff Quinn <je...@nuna.com>
Authored: Wed Nov 25 10:11:04 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 9 20:45:36 2015 -0800
----------------------------------------------------------------------
.../crunch/impl/mr/run/RuntimeParameters.java | 2 +
.../apache/crunch/io/impl/FileTargetImpl.java | 71 ++++++++++++++++++--
.../crunch/io/impl/FileTargetImplTest.java | 62 +++++++++++++++++
3 files changed, 129 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 07abf11..fe6f7ee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -43,6 +43,8 @@ public final class RuntimeParameters {
public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs";
+ public static final String FILE_TARGET_MAX_THREADS = "crunch.file.target.max.threads";
+
// Not instantiated
private RuntimeParameters() {
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index e87485d..5f4cfbb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,16 +18,28 @@
package org.apache.crunch.io.impl;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.FormatBundle;
@@ -118,6 +130,35 @@ public class FileTargetImpl implements PathTarget {
return ptype.getConverter();
}
+ private class WorkingPathFileMover implements Callable<Boolean> {
+ private Configuration conf;
+ private Path src;
+ private Path dst;
+ private FileSystem srcFs;
+ private FileSystem dstFs;
+ private boolean sameFs;
+
+
+ public WorkingPathFileMover(Configuration conf, Path src, Path dst,
+ FileSystem srcFs, FileSystem dstFs, boolean sameFs) {
+ this.conf = conf;
+ this.src = src;
+ this.dst = dst;
+ this.srcFs = srcFs;
+ this.dstFs = dstFs;
+ this.sameFs = sameFs;
+ }
+
+ @Override
+ public Boolean call() throws IOException {
+ if (sameFs) {
+ return srcFs.rename(src, dst);
+ } else {
+ return FileUtil.copy(srcFs, src, dstFs, dst, true, true, conf);
+ }
+ }
+ }
+
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
@@ -128,15 +169,33 @@ public class FileTargetImpl implements PathTarget {
dstFs.mkdirs(path);
}
boolean sameFs = isCompatible(srcFs, path);
+ List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(
+ conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1)));
for (Path s : srcs) {
Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
- if (sameFs) {
- srcFs.rename(s, d);
- } else {
- FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
- }
+ renameFutures.add(
+ executorService.submit(
+ new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
+ }
+ LOG.debug("Renaming " + renameFutures.size() + " files.");
+
+ ListenableFuture<List<Boolean>> future =
+ Futures.successfulAsList(renameFutures);
+ List<Boolean> renameResults = null;
+ try {
+ renameResults = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwables.propagate(e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ if (renameResults != null && !renameResults.contains(false)) {
+ dstFs.create(getSuccessIndicator(), true).close();
+ LOG.debug("Renamed " + renameFutures.size() + " files.");
}
- dstFs.create(getSuccessIndicator(), true).close();
}
protected Path getSuccessIndicator() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
new file mode 100644
index 0000000..6bc13d2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileTargetImplTest {
+
+ @Rule
+ public TemporaryFolder TMP = new TemporaryFolder();
+
+ @Test
+ public void testHandleOutputsMovesFilesToDestination() throws Exception {
+ java.nio.file.Path testWorkingPath = TMP.newFolder().toPath();
+ java.nio.file.Path testDestinationPath = TMP.newFolder().toPath();
+ FileTargetImpl fileTarget = new FileTargetImpl(
+ new Path(testDestinationPath.toAbsolutePath().toString()),
+ SequenceFileOutputFormat.class,
+ SequentialFileNamingScheme.getInstance());
+
+ File testPart1 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00000");
+ File testPart2 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00001");
+ FileUtils.writeStringToFile(testPart1, "test1");
+ FileUtils.writeStringToFile(testPart2, "test2");
+ fileTarget.handleOutputs(new Configuration(),
+ new Path(testWorkingPath.toAbsolutePath().toString()),
+ -1);
+
+ assertEquals(FileUtils.readFileToString(
+ new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00000")),
+ "test1");
+ assertEquals(FileUtils.readFileToString(
+ new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00001")),
+ "test2");
+ }
+}
\ No newline at end of file