You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2017/03/20 20:47:11 UTC
crunch git commit: CRUNCH-636: Make replication factor for temporary
files configurable
Repository: crunch
Updated Branches:
refs/heads/master e520d9f6e -> e176b6166
CRUNCH-636: Make replication factor for temporary files configurable
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e176b616
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e176b616
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e176b616
Branch: refs/heads/master
Commit: e176b6166218fabc247eef25cbbc549271f8bd2d
Parents: e520d9f
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Mar 20 11:17:55 2017 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Mar 20 11:53:55 2017 -0700
----------------------------------------------------------------------
.../crunch/impl/dist/DistributedPipeline.java | 40 ++++++-
.../crunch/impl/mr/plan/JobPrototype.java | 94 ++++++++++++++-
.../impl/dist/DistributedPipelineTest.java | 96 +++++++++++++++
.../crunch/impl/mr/plan/JobPrototypeTest.java | 117 +++++++++++++++++++
4 files changed, 344 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index d3fb0d0..1deafd5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.impl.dist;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -51,6 +52,7 @@ import org.apache.crunch.io.From;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.To;
+import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -58,6 +60,7 @@ import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +74,7 @@ public abstract class DistributedPipeline implements Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(DistributedPipeline.class);
private static final Random RANDOM = new Random();
+ private static final String CRUNCH_TMP_DIRS = "crunch.tmp.dirs";
private final String name;
protected final PCollectionFactory factory;
@@ -103,6 +107,22 @@ public abstract class DistributedPipeline implements Pipeline {
this.nextAnonymousStageId = 0;
}
+ public static boolean isTempDir(Job job, String outputPath) {
+ String tmpDirs = job.getConfiguration().get(CRUNCH_TMP_DIRS);
+
+ if (tmpDirs == null ) {
+ return false;
+ }
+
+ for (String p : tmpDirs.split(":")) {
+ if (outputPath.contains(p)) {
+ LOG.debug(String.format("Matched temporary directory : %s in %s", p, outputPath));
+ return true;
+ }
+ }
+ return false;
+ }
+
public PCollectionFactory getFactory() {
return factory;
}
@@ -390,7 +410,25 @@ public abstract class DistributedPipeline implements Pipeline {
public Path createTempPath() {
tempFileIndex++;
- return new Path(getTempDirectory(), "p" + tempFileIndex);
+ Path path = new Path(getTempDirectory(), "p" + tempFileIndex);
+ storeTempDirLocation(path);
+ return path;
+ }
+
+ @VisibleForTesting
+ protected void storeTempDirLocation(Path t) {
+ String tmpCfg = conf.get(CRUNCH_TMP_DIRS);
+ String tmpDir = t.toString();
+
+ LOG.debug(String.format("Temporary directory created: %s", tmpDir));
+
+ if (tmpCfg != null && !tmpCfg.contains(tmpDir)) {
+ conf.set(CRUNCH_TMP_DIRS, String.format("%s:%s", tmpCfg, tmpDir));
+ }
+ else if (tmpCfg == null) {
+ conf.set(CRUNCH_TMP_DIRS, tmpDir);
+ }
+
}
private synchronized Path getTempDirectory() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d23de3b..d31bfad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -23,9 +23,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.crunch.Pipeline;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.collect.DoTable;
@@ -39,9 +40,11 @@ import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
import org.apache.crunch.impl.mr.run.CrunchReducer;
import org.apache.crunch.impl.mr.run.NodeContext;
import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.PType;
import org.apache.crunch.util.DistCache;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -51,11 +54,18 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
class JobPrototype {
+ private static final String DFS_REPLICATION = "dfs.replication";
+ private static final String DFS_REPLICATION_INITIAL = "dfs.replication.initial";
+ private static final String CRUNCH_TMP_DIR_REPLICATION = "crunch.tmp.dir.replication";
+
public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
- Set<NodePath> inputs, Path workingPath) {
+ Set<NodePath> inputs, Path workingPath) {
return new JobPrototype(jobID, inputs, group, workingPath);
}
@@ -84,6 +94,7 @@ class JobPrototype {
this.targetsToNodePaths = null;
}
+ @VisibleForTesting
private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
this.jobID = jobID;
this.group = null;
@@ -141,9 +152,13 @@ class JobPrototype {
return job;
}
+ private static final Logger LOG = LoggerFactory.getLogger(JobPrototype.class);
+
private CrunchControlledJob build(
Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws IOException {
+
Job job = new Job(conf);
+ LOG.debug(String.format("Replication factor: %s", job.getConfiguration().get(DFS_REPLICATION)));
conf = job.getConfiguration();
conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
job.setJarByClass(jarClass);
@@ -152,19 +167,28 @@ class JobPrototype {
Set<Target> allTargets = Sets.newHashSet();
Path outputPath = new Path(workingPath, "output");
MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
+
+ boolean onlyHasTemporaryOutput =true;
+
for (Target target : targetsToNodePaths.keySet()) {
DoNode node = null;
+ LOG.debug("Target path: " + target);
for (NodePath nodePath : targetsToNodePaths.get(target)) {
if (node == null) {
PType<?> ptype = nodePath.tail().getPType();
node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
outputHandler.configureNode(node, target);
+
+ onlyHasTemporaryOutput &= DistributedPipeline.isTempDir(job, target.toString());
}
outputNodes.add(walkPath(nodePath.descendingIterator(), node));
}
allTargets.add(target);
}
+ setJobReplication(job.getConfiguration(), onlyHasTemporaryOutput);
+
+
Set<DoNode> mapSideNodes = Sets.newHashSet();
if (mapSideNodePaths != null) {
for (Target target : mapSideNodePaths.keySet()) {
@@ -243,6 +267,72 @@ class JobPrototype {
completionHook);
}
+ @VisibleForTesting
+ protected void setJobReplication(Configuration jobConfiguration, boolean onlyHasTemporaryOutput) {
+ String userSuppliedTmpDirReplication = jobConfiguration.get(CRUNCH_TMP_DIR_REPLICATION);
+ if (userSuppliedTmpDirReplication == null) {
+ return;
+ }
+
+ handleInitialReplication(jobConfiguration);
+
+ if (onlyHasTemporaryOutput) {
+ LOG.debug(String.format("Setting replication factor to: %s ", userSuppliedTmpDirReplication));
+ jobConfiguration.set(DFS_REPLICATION, userSuppliedTmpDirReplication);
+ }
+ else {
+ String originalReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL);
+ LOG.debug(String.format("Using initial replication factor (%s)", originalReplication));
+ jobConfiguration.set(DFS_REPLICATION, originalReplication);
+ }
+ }
+
+ @VisibleForTesting
+ protected void handleInitialReplication(Configuration jobConfiguration) {
+
+ String origReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL);
+ if (origReplication != null) {
+ LOG.debug(String.format("Initial replication has been already set (%s); nothing to do.", origReplication));
+ return;
+ }
+
+ String defaultReplication = jobConfiguration.get(DFS_REPLICATION);
+
+ if (defaultReplication != null) {
+ LOG.debug(String.format("Using dfs.replication (%s) set by user as initial replication.",
+ defaultReplication));
+ setInitialJobReplicationConfig(jobConfiguration, defaultReplication);
+ return;
+ }
+
+ Set<Target> targets = targetsToNodePaths.keySet();
+ Target t = targets.iterator().next();
+ if (t instanceof FileTargetImpl) {
+ Path path = ((FileTargetImpl) t).getPath();
+ defaultReplication = tryGetDefaultReplicationFromFileSystem(jobConfiguration, path, "3");
+ }
+
+ setInitialJobReplicationConfig(jobConfiguration, defaultReplication);
+ }
+
+ private String tryGetDefaultReplicationFromFileSystem(Configuration jobConf, Path path, String defaultReplication) {
+ String d;
+ try {
+ FileSystem fs = path.getFileSystem(jobConf);
+ d = fs.getConf().get(DFS_REPLICATION);
+ LOG.debug(
+ String.format("Using dfs.replication (%s) retrieved from remote filesystem as initial replication.", d));
+ } catch (IOException e) {
+ d = defaultReplication;
+ LOG.warn(String.format("Cannot read job's config. Setting initial replication to %s.", d));
+ }
+ return d;
+ }
+
+ private void setInitialJobReplicationConfig(Configuration job, String defaultReplication) {
+ job.set(DFS_REPLICATION_INITIAL, defaultReplication);
+ }
+
private static CrunchControlledJob.Hook getHook(
CrunchControlledJob.Hook base,
List<CrunchControlledJob.Hook> optional) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
new file mode 100644
index 0000000..40e4bf0
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributedPipelineTest {
+ private final String testTempDirPath1 = "/tmp/crunch-1345424622/p1";
+ private final String testTempDirPath2 = "/tmp/crunch-1345424622/p2";
+
+ @Mock private Job mockJob;
+ @Mock private Path mockPath;
+ private Configuration testConfiguration = new Configuration();
+
+ @Before
+ public void setUp() {
+ when(mockJob.getConfiguration()).thenReturn(testConfiguration);
+ }
+
+ @Test
+ public void isTempDirFalseWhenCrunchCreatesNoDirs() {
+ boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1);
+ Assert.assertFalse(isTmp);
+ }
+
+ @Test
+ public void isTempDirTrueWhenFileIsInTempDir() {
+ testConfiguration.set("crunch.tmp.dirs", "/tmp/crunch-1345424622/p1");
+ boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1);
+ Assert.assertTrue(isTmp);
+ }
+
+ @Test
+ public void isTempDirFalseWhenFileIsNotInTempDir() {
+ testConfiguration.set("crunch.tmp.dirs", testTempDirPath1.toString());
+ boolean isTmp = DistributedPipeline.isTempDir(mockJob, "/user/crunch/iwTV2/");
+ Assert.assertFalse(isTmp);
+ }
+
+ @Test
+ public void tempDirsAreStoredInPipelineConf() {
+ DistributedPipeline distributedPipeline = Mockito.mock(DistributedPipeline.class, Mockito.CALLS_REAL_METHODS);
+ Configuration testConfiguration = new Configuration();
+ distributedPipeline.setConfiguration(testConfiguration);
+
+ // no temp directory is present at startup
+ Assert.assertEquals(
+ null,
+ distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+ // store a temp directory
+ distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1));
+ Assert.assertEquals(
+ testTempDirPath1.toString(),
+ distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+ // store one more temp directory
+ distributedPipeline.storeTempDirLocation(new Path(testTempDirPath2));
+ Assert.assertEquals(
+ String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()),
+ distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+ // try to store the first temp directory again, not added again
+ distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1));
+ Assert.assertEquals(
+ String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()),
+ distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
new file mode 100644
index 0000000..44da01a
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import com.google.common.collect.HashMultimap;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JobPrototypeTest {
+
+ public static final String DFS_REPLICATION = "dfs.replication";
+ public static final String TEST_INITIAL_DFS_REPLICATION = "42";
+ public static final String TEST_TMP_DIR_REPLICATION = "1";
+
+ @Mock private Path mockPath;
+ @Mock private FileTargetImpl mockTarget;
+ @Mock private FileSystem mockFs;
+ @Mock private DoNode mockNode;
+ @Mock private PGroupedTableImpl<String, String> mockPgroup;
+ @Mock private Set<NodePath> mockInputs;
+ private JobPrototype jobPrototypeUnderTest;
+ private Configuration testConfiguration = new Configuration();
+
+ @Before
+ public void setUp() {
+ testConfiguration.set("dfs.replication.initial", TEST_INITIAL_DFS_REPLICATION);
+ testConfiguration.set("crunch.tmp.dir.replication", TEST_TMP_DIR_REPLICATION);
+ doReturn(new Object[]{}).when(mockInputs).toArray();
+ jobPrototypeUnderTest= JobPrototype.createMapReduceJob(42,
+ mockPgroup, mockInputs, mockPath);
+ }
+
+ @Test
+ public void initialReplicationFactorSetForLeafOutputTargets() {
+ jobPrototypeUnderTest.setJobReplication(testConfiguration, true);
+
+ assertEquals(TEST_TMP_DIR_REPLICATION, testConfiguration.get(DFS_REPLICATION));
+ }
+
+ @Test
+ public void userDefinedTmpDirReplicationFactorSetForIntermediateTargets() {
+ jobPrototypeUnderTest.setJobReplication(testConfiguration, false);
+
+ assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get(DFS_REPLICATION));
+ }
+
+ @Test
+ public void initialReplicationFactorSetIfUserSpecified() {
+ jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+
+ assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial"));
+ }
+
+ @Test
+ public void initialReplicationFactorUsedFromFileSystem() throws IOException {
+ testConfiguration = new Configuration();
+ HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create();
+ targetNodePaths.put(mockTarget, new NodePath());
+ doReturn(mockPath).when(mockTarget).getPath();
+ doReturn(mockFs).when(mockPath).getFileSystem(any(Configuration.class));
+ Configuration c = new Configuration();
+ c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION);
+ doReturn(c).when(mockFs).getConf();
+ jobPrototypeUnderTest.addReducePaths(targetNodePaths);
+
+ jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+ assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial"));
+ }
+
+ @Test
+ public void initialReplicationFactorUsedWhenItCannotBeRetrievedFromFileSystem() throws IOException {
+ testConfiguration = new Configuration();
+ HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create();
+ targetNodePaths.put(mockTarget, new NodePath());
+ doReturn(mockPath).when(mockTarget).getPath();
+ doThrow(new IOException()).when(mockPath).getFileSystem(any(Configuration.class));
+ Configuration c = new Configuration();
+ c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION);
+ jobPrototypeUnderTest.addReducePaths(targetNodePaths);
+
+ jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+ assertEquals("3", testConfiguration.get("dfs.replication.initial")); //default
+ }
+}