You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2017/03/20 20:47:11 UTC

crunch git commit: CRUNCH-636: Make replication factor for temporary files configurable

Repository: crunch
Updated Branches:
  refs/heads/master e520d9f6e -> e176b6166


CRUNCH-636: Make replication factor for temporary files configurable

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e176b616
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e176b616
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e176b616

Branch: refs/heads/master
Commit: e176b6166218fabc247eef25cbbc549271f8bd2d
Parents: e520d9f
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Mar 20 11:17:55 2017 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Mar 20 11:53:55 2017 -0700

----------------------------------------------------------------------
 .../crunch/impl/dist/DistributedPipeline.java   |  40 ++++++-
 .../crunch/impl/mr/plan/JobPrototype.java       |  94 ++++++++++++++-
 .../impl/dist/DistributedPipelineTest.java      |  96 +++++++++++++++
 .../crunch/impl/mr/plan/JobPrototypeTest.java   | 117 +++++++++++++++++++
 4 files changed, 344 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index d3fb0d0..1deafd5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.dist;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -51,6 +52,7 @@ import org.apache.crunch.io.From;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.To;
+import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -58,6 +60,7 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +74,7 @@ public abstract class DistributedPipeline implements Pipeline {
   private static final Logger LOG = LoggerFactory.getLogger(DistributedPipeline.class);
 
   private static final Random RANDOM = new Random();
+  private static final String CRUNCH_TMP_DIRS = "crunch.tmp.dirs";
 
   private final String name;
   protected final PCollectionFactory factory;
@@ -103,6 +107,22 @@ public abstract class DistributedPipeline implements Pipeline {
     this.nextAnonymousStageId = 0;
   }
 
+  public static boolean isTempDir(Job job, String outputPath) {
+    String tmpDirs = job.getConfiguration().get(CRUNCH_TMP_DIRS);
+
+    if (tmpDirs == null ) {
+      return false;
+    }
+
+    for (String p : tmpDirs.split(":")) {
+      if (outputPath.contains(p)) {
+        LOG.debug(String.format("Matched temporary directory : %s in %s", p, outputPath));
+        return true;
+      }
+    }
+    return false;
+  }
+
   public PCollectionFactory getFactory() {
     return factory;
   }
@@ -390,7 +410,25 @@ public abstract class DistributedPipeline implements Pipeline {
 
   public Path createTempPath() {
     tempFileIndex++;
-    return new Path(getTempDirectory(), "p" + tempFileIndex);
+    Path path = new Path(getTempDirectory(), "p" + tempFileIndex);
+    storeTempDirLocation(path);
+    return path;
+  }
+
+  @VisibleForTesting
+  protected void storeTempDirLocation(Path t) {
+    String tmpCfg = conf.get(CRUNCH_TMP_DIRS);
+    String tmpDir = t.toString();
+
+    LOG.debug(String.format("Temporary directory created: %s", tmpDir));
+
+    if (tmpCfg != null && !tmpCfg.contains(tmpDir)) {
+      conf.set(CRUNCH_TMP_DIRS, String.format("%s:%s", tmpCfg, tmpDir));
+    }
+    else if (tmpCfg == null) {
+      conf.set(CRUNCH_TMP_DIRS, tmpDir);
+    }
+
   }
 
   private synchronized Path getTempDirectory() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index d23de3b..d31bfad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -23,9 +23,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.crunch.Pipeline;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.collect.DoTable;
@@ -39,9 +40,11 @@ import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -51,11 +54,18 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 class JobPrototype {
 
+  private static final String DFS_REPLICATION = "dfs.replication";
+  private static final String DFS_REPLICATION_INITIAL = "dfs.replication.initial";
+  private static final String CRUNCH_TMP_DIR_REPLICATION = "crunch.tmp.dir.replication";
+
   public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
-      Set<NodePath> inputs, Path workingPath) {
+                                                Set<NodePath> inputs, Path workingPath) {
     return new JobPrototype(jobID, inputs, group, workingPath);
   }
 
@@ -84,6 +94,7 @@ class JobPrototype {
     this.targetsToNodePaths = null;
   }
 
+  @VisibleForTesting
   private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
     this.jobID = jobID;
     this.group = null;
@@ -141,9 +152,13 @@ class JobPrototype {
     return job;
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(JobPrototype.class);
+
   private CrunchControlledJob build(
       Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws IOException {
+
     Job job = new Job(conf);
+    LOG.debug(String.format("Replication factor: %s", job.getConfiguration().get(DFS_REPLICATION)));
     conf = job.getConfiguration();
     conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
     job.setJarByClass(jarClass);
@@ -152,19 +167,28 @@ class JobPrototype {
     Set<Target> allTargets = Sets.newHashSet();
     Path outputPath = new Path(workingPath, "output");
     MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
+
+    boolean onlyHasTemporaryOutput =true;
+
     for (Target target : targetsToNodePaths.keySet()) {
       DoNode node = null;
+      LOG.debug("Target path: " + target);
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
         if (node == null) {
           PType<?> ptype = nodePath.tail().getPType();
           node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
           outputHandler.configureNode(node, target);
+
+          onlyHasTemporaryOutput &= DistributedPipeline.isTempDir(job, target.toString());
         }
         outputNodes.add(walkPath(nodePath.descendingIterator(), node));
       }
       allTargets.add(target);
     }
 
+    setJobReplication(job.getConfiguration(), onlyHasTemporaryOutput);
+
+
     Set<DoNode> mapSideNodes = Sets.newHashSet();
     if (mapSideNodePaths != null) {
       for (Target target : mapSideNodePaths.keySet()) {
@@ -243,6 +267,72 @@ class JobPrototype {
         completionHook);
   }
 
+  @VisibleForTesting
+  protected void setJobReplication(Configuration jobConfiguration, boolean onlyHasTemporaryOutput) {
+    String userSuppliedTmpDirReplication = jobConfiguration.get(CRUNCH_TMP_DIR_REPLICATION);
+    if  (userSuppliedTmpDirReplication == null) {
+      return;
+    }
+
+    handleInitialReplication(jobConfiguration);
+
+    if (onlyHasTemporaryOutput) {
+      LOG.debug(String.format("Setting replication factor to: %s ", userSuppliedTmpDirReplication));
+      jobConfiguration.set(DFS_REPLICATION, userSuppliedTmpDirReplication);
+    }
+    else {
+      String originalReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL);
+      LOG.debug(String.format("Using initial replication factor (%s)", originalReplication));
+      jobConfiguration.set(DFS_REPLICATION, originalReplication);
+    }
+  }
+
+  @VisibleForTesting
+  protected void handleInitialReplication(Configuration jobConfiguration) {
+
+    String origReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL);
+    if (origReplication != null) {
+      LOG.debug(String.format("Initial replication has been already set (%s); nothing to do.", origReplication));
+      return;
+    }
+
+    String defaultReplication = jobConfiguration.get(DFS_REPLICATION);
+
+    if (defaultReplication != null) {
+      LOG.debug(String.format("Using dfs.replication (%s) set by user as initial replication.",
+              defaultReplication));
+      setInitialJobReplicationConfig(jobConfiguration, defaultReplication);
+      return;
+    }
+
+    Set<Target> targets = targetsToNodePaths.keySet();
+    Target t = targets.iterator().next();
+    if (t instanceof FileTargetImpl) {
+      Path path = ((FileTargetImpl) t).getPath();
+      defaultReplication = tryGetDefaultReplicationFromFileSystem(jobConfiguration, path, "3");
+    }
+
+    setInitialJobReplicationConfig(jobConfiguration, defaultReplication);
+  }
+
+  private String tryGetDefaultReplicationFromFileSystem(Configuration jobConf, Path path, String defaultReplication) {
+    String d;
+    try {
+      FileSystem fs = path.getFileSystem(jobConf);
+      d = fs.getConf().get(DFS_REPLICATION);
+      LOG.debug(
+              String.format("Using dfs.replication (%s) retrieved from remote filesystem as initial replication.", d));
+    } catch (IOException e) {
+      d = defaultReplication;
+      LOG.warn(String.format("Cannot read job's config. Setting initial replication to %s.", d));
+    }
+    return d;
+  }
+
+  private void setInitialJobReplicationConfig(Configuration job, String defaultReplication) {
+    job.set(DFS_REPLICATION_INITIAL, defaultReplication);
+  }
+
   private static CrunchControlledJob.Hook getHook(
       CrunchControlledJob.Hook base,
       List<CrunchControlledJob.Hook> optional) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
new file mode 100644
index 0000000..40e4bf0
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributedPipelineTest {
+    private final String testTempDirPath1 = "/tmp/crunch-1345424622/p1";
+    private final String testTempDirPath2 = "/tmp/crunch-1345424622/p2";
+
+    @Mock private Job mockJob;
+    @Mock private Path mockPath;
+    private Configuration testConfiguration = new Configuration();
+
+    @Before
+    public void setUp() {
+        when(mockJob.getConfiguration()).thenReturn(testConfiguration);
+    }
+
+    @Test
+    public void isTempDirFalseWhenCrunchCreatesNoDirs() {
+        boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1);
+        Assert.assertFalse(isTmp);
+    }
+
+    @Test
+    public void isTempDirTrueWhenFileIsInTempDir() {
+        testConfiguration.set("crunch.tmp.dirs", "/tmp/crunch-1345424622/p1");
+        boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1);
+        Assert.assertTrue(isTmp);
+    }
+
+    @Test
+    public void isTempDirFalseWhenFileIsNotInTempDir() {
+        testConfiguration.set("crunch.tmp.dirs", testTempDirPath1.toString());
+        boolean isTmp = DistributedPipeline.isTempDir(mockJob, "/user/crunch/iwTV2/");
+        Assert.assertFalse(isTmp);
+    }
+
+    @Test
+    public void tempDirsAreStoredInPipelineConf() {
+        DistributedPipeline distributedPipeline = Mockito.mock(DistributedPipeline.class, Mockito.CALLS_REAL_METHODS);
+        Configuration testConfiguration = new Configuration();
+        distributedPipeline.setConfiguration(testConfiguration);
+
+        // no temp directory is present at startup
+        Assert.assertEquals(
+                null,
+                distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+        // store a temp directory
+        distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1));
+        Assert.assertEquals(
+                testTempDirPath1.toString(),
+                distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+        // store one more temp directory
+        distributedPipeline.storeTempDirLocation(new Path(testTempDirPath2));
+        Assert.assertEquals(
+                String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()),
+                distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+
+        // try to store the first temp directory again, not added again
+        distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1));
+        Assert.assertEquals(
+                String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()),
+                distributedPipeline.getConfiguration().get("crunch.tmp.dirs"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
new file mode 100644
index 0000000..44da01a
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import com.google.common.collect.HashMultimap;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JobPrototypeTest {
+
+  public static final String DFS_REPLICATION = "dfs.replication";
+  public static final String TEST_INITIAL_DFS_REPLICATION = "42";
+  public static final String TEST_TMP_DIR_REPLICATION = "1";
+
+  @Mock private Path mockPath;
+  @Mock private FileTargetImpl mockTarget;
+  @Mock private FileSystem mockFs;
+  @Mock private DoNode mockNode;
+  @Mock private PGroupedTableImpl<String, String> mockPgroup;
+  @Mock private Set<NodePath> mockInputs;
+  private JobPrototype jobPrototypeUnderTest;
+  private Configuration testConfiguration = new Configuration();
+
+  @Before
+  public void setUp() {
+    testConfiguration.set("dfs.replication.initial", TEST_INITIAL_DFS_REPLICATION);
+    testConfiguration.set("crunch.tmp.dir.replication", TEST_TMP_DIR_REPLICATION);
+    doReturn(new Object[]{}).when(mockInputs).toArray();
+    jobPrototypeUnderTest=  JobPrototype.createMapReduceJob(42,
+            mockPgroup, mockInputs, mockPath);
+  }
+
+  @Test
+  public void initialReplicationFactorSetForLeafOutputTargets() {
+    jobPrototypeUnderTest.setJobReplication(testConfiguration, true);
+
+    assertEquals(TEST_TMP_DIR_REPLICATION, testConfiguration.get(DFS_REPLICATION));
+  }
+
+  @Test
+  public void userDefinedTmpDirReplicationFactorSetForIntermediateTargets() {
+    jobPrototypeUnderTest.setJobReplication(testConfiguration, false);
+
+    assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get(DFS_REPLICATION));
+  }
+
+  @Test
+  public void initialReplicationFactorSetIfUserSpecified() {
+    jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+
+    assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial"));
+  }
+
+  @Test
+  public void initialReplicationFactorUsedFromFileSystem() throws IOException {
+    testConfiguration = new Configuration();
+    HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create();
+    targetNodePaths.put(mockTarget, new NodePath());
+    doReturn(mockPath).when(mockTarget).getPath();
+    doReturn(mockFs).when(mockPath).getFileSystem(any(Configuration.class));
+    Configuration c = new Configuration();
+    c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION);
+    doReturn(c).when(mockFs).getConf();
+    jobPrototypeUnderTest.addReducePaths(targetNodePaths);
+
+    jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+    assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial"));
+  }
+
+  @Test
+  public void initialReplicationFactorUsedWhenItCannotBeRetrievedFromFileSystem() throws IOException {
+    testConfiguration = new Configuration();
+    HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create();
+    targetNodePaths.put(mockTarget, new NodePath());
+    doReturn(mockPath).when(mockTarget).getPath();
+    doThrow(new IOException()).when(mockPath).getFileSystem(any(Configuration.class));
+    Configuration c = new Configuration();
+    c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION);
+    jobPrototypeUnderTest.addReducePaths(targetNodePaths);
+
+    jobPrototypeUnderTest.handleInitialReplication(testConfiguration);
+    assertEquals("3", testConfiguration.get("dfs.replication.initial"));  //default
+  }
+}