You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ks...@apache.org on 2018/02/09 20:05:24 UTC

tez git commit: TEZ-3894. Tez intermediate outputs implicitly rely on permissive umask for shuffle (Jason Lowe via kshukla)

Repository: tez
Updated Branches:
  refs/heads/master 96c988cff -> a1f2da8eb


TEZ-3894. Tez intermediate outputs implicitly rely on permissive umask for shuffle (Jason Lowe via kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a1f2da8e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a1f2da8e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a1f2da8e

Branch: refs/heads/master
Commit: a1f2da8eb319218ff2a6dbe0f6de911336ac7e45
Parents: 96c988c
Author: Kuhu Shukla <ks...@yahoo-inc.com>
Authored: Fri Feb 9 13:54:11 2018 -0600
Committer: Kuhu Shukla <ks...@yahoo-inc.com>
Committed: Fri Feb 9 13:54:11 2018 -0600

----------------------------------------------------------------------
 .../common/sort/impl/PipelinedSorter.java       | 12 +++++
 .../common/sort/impl/TezSpillRecord.java        |  5 ++
 .../common/sort/impl/dflt/DefaultSorter.java    | 12 +++++
 .../writers/UnorderedPartitionedKVWriter.java   | 12 +++++
 .../common/sort/impl/TestPipelinedSorter.java   | 57 +++++++++++---------
 .../sort/impl/dflt/TestDefaultSorter.java       | 25 ++++++++-
 .../TestUnorderedPartitionedKVWriter.java       | 24 ++++++++-
 7 files changed, 119 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index c4782f6..7915662 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -38,6 +38,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -68,6 +69,8 @@ import org.apache.tez.util.StopWatch;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class PipelinedSorter extends ExternalSorter {
   
@@ -479,6 +482,9 @@ public class PipelinedSorter extends ExternalSorter {
             * MAP_OUTPUT_INDEX_RECORD_LENGTH);
     spillFilePaths.put(numSpills, filename);
     FSDataOutputStream out = rfs.create(filename, true, 4096);
+    if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+      rfs.setPermission(filename, SPILL_FILE_PERMS);
+    }
 
     try {
       LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
@@ -564,6 +570,9 @@ public class PipelinedSorter extends ExternalSorter {
         mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename, true, 4096);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(filename, SPILL_FILE_PERMS);
+      }
       LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
@@ -749,6 +758,9 @@ public class PipelinedSorter extends ExternalSorter {
       }
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
+      }
 
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index ab4142b..48bd211 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -30,11 +30,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.tez.runtime.library.common.Constants;
 
 public class TezSpillRecord {
+  public static final FsPermission SPILL_FILE_PERMS = new FsPermission((short) 0640);
 
   /** Backing store */
   private final ByteBuffer buf;
@@ -140,6 +142,9 @@ public class TezSpillRecord {
       } else {
         out.close();
       }
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(job)))) {
+        rfs.setPermission(loc, SPILL_FILE_PERMS);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 85e0003..cfcbd56 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,6 +32,7 @@ import java.util.zip.Deflater;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -63,6 +64,8 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 
 import com.google.common.base.Preconditions;
 
+import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public final class DefaultSorter extends ExternalSorter implements IndexedSortable {
   
@@ -893,6 +896,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(filename, SPILL_FILE_PERMS);
+      }
 
       int spindex = mstart;
       final InMemValBytes value = createInMemValBytes();
@@ -1000,6 +1006,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(filename, SPILL_FILE_PERMS);
+      }
 
       // we don't run the combiner for a single record
       for (int i = 0; i < partitions; ++i) {
@@ -1273,6 +1282,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
 
     //The output stream for the final single output file
     FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+    if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+      rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
+    }
 
     if (numSpills == 0) {
       // TODO Change event generation to say there is no data rather than generating a dummy file

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index f4ebc97..b9f0edf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.CallableWithNdc;
@@ -83,6 +84,8 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
+import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+
 public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter {
 
   private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVWriter.class);
@@ -588,6 +591,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         this.spillIndex = spillPathDetails.spillIndex;
       }
       FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(spillPathDetails.outputFilePath, SPILL_FILE_PERMS);
+      }
       TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       DataInputBuffer key = new DataInputBuffer();
       DataInputBuffer val = new DataInputBuffer();
@@ -984,6 +990,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     FSDataOutputStream out = null;
     try {
       out = rfs.create(finalOutPath);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
+      }
       Writer writer = null;
 
       for (int i = 0; i < numPartitions; i++) {
@@ -1072,6 +1081,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       final Path outPath = spillPathDetails.outputFilePath;
       out = rfs.create(outPath);
+      if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
+        rfs.setPermission(outPath, SPILL_FILE_PERMS);
+      }
       BitSet emptyPartitions = null;
       if (pipelinedShuffle || !isFinalMergeEnabled) {
         emptyPartitions = new BitSet(numPartitions);

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index d6f6273..727f8ac 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.library.common.sort.impl;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -43,6 +45,7 @@ import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
@@ -70,8 +73,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 public class TestPipelinedSorter {
+  private static Configuration conf;
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private static LocalDirAllocator dirAllocator;
   private OutputContext outputContext;
 
   private int numOutputs;
@@ -81,13 +86,14 @@ public class TestPipelinedSorter {
   private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap();
 
   static {
-    Configuration conf = getConf();
+    conf = getConf();
     try {
       localFs = FileSystem.getLocal(conf);
       workDir = new Path(
           new Path(System.getProperty("test.build.data", "/tmp")),
           TestPipelinedSorter.class.getName())
           .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+      dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -100,10 +106,11 @@ public class TestPipelinedSorter {
 
   @Before
   public void setup() throws IOException {
+    conf = getConf();
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
-    String auxiliaryService = getConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+    String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
         TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
     this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
   }
@@ -111,6 +118,7 @@ public class TestPipelinedSorter {
   public static Configuration getConf() {
     Configuration conf = new Configuration();
     conf.set("fs.defaultFS", "file:///");
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     //To enable PipelinedSorter
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
 
@@ -139,15 +147,13 @@ public class TestPipelinedSorter {
     //TODO: need to support multiple partition testing later
 
     //# partition, # of keys, size per key, InitialMem, blockSize
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
-
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   @Test
   public void testWithoutPartitionStats() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false);
     //# partition, # of keys, size per key, InitialMem, blockSize
     basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -156,7 +162,6 @@ public class TestPipelinedSorter {
 
   @Test
   public void testWithEmptyData() throws IOException {
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     //# partition, # of keys, size per key, InitialMem, blockSize
     basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20);
@@ -166,7 +171,6 @@ public class TestPipelinedSorter {
   public void testEmptyDataWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setInt(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
@@ -207,7 +211,6 @@ public class TestPipelinedSorter {
     int partitions = 50;
     this.numOutputs = partitions;
     this.initialAvailableMem = 1 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     conf.setInt(TezRuntimeConfiguration
@@ -222,6 +225,7 @@ public class TestPipelinedSorter {
       assertTrue(sorter.getNumSpills() == numKeys + 1);
     }
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
     Path indexFile = sorter.getFinalIndexFile();
     TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
     for (int i = 0; i < partitions; i++) {
@@ -264,7 +268,6 @@ public class TestPipelinedSorter {
 
   @Test
   public void testExceedsKVWithMultiplePartitions() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 1 * 1024 * 1024;
@@ -275,13 +278,13 @@ public class TestPipelinedSorter {
 
     writeData(sorter, 100, 1<<20);
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   @Test
   public void testExceedsKVWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setInt(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
@@ -301,7 +304,6 @@ public class TestPipelinedSorter {
   public void test_TEZ_2602_50mb() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 1 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     conf.setInt(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
@@ -318,13 +320,13 @@ public class TestPipelinedSorter {
 
     sorter.flush();
     sorter.close();
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   //@Test
   public void testLargeDataWithMixedKV() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 48 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
         initialAvailableMem);
@@ -346,6 +348,7 @@ public class TestPipelinedSorter {
 
     sorter.flush();
     sorter.close();
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
 
@@ -382,7 +385,6 @@ public class TestPipelinedSorter {
   @Test
   public void testWithCustomComparator() throws IOException {
     //Test with custom comparator
-    Configuration conf = getConf();
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
         CustomComparator.class.getName());
     basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20);
@@ -392,7 +394,6 @@ public class TestPipelinedSorter {
   public void testWithPipelinedShuffle() throws IOException {
     this.numOutputs = 1;
     this.initialAvailableMem = 5 *1024 * 1024;
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setInt(TezRuntimeConfiguration
@@ -411,7 +412,6 @@ public class TestPipelinedSorter {
 
   @Test
   public void testCountersWithMultiplePartitions() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 5 * 1024 * 1024;
@@ -422,11 +422,11 @@ public class TestPipelinedSorter {
 
     writeData(sorter, 10000, 100);
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   @Test
   public void testMultipleSpills() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 5 * 1024 * 1024;
@@ -438,11 +438,11 @@ public class TestPipelinedSorter {
     writeData(sorter, 25000, 1000);
     assertFalse("Expecting needsRLE to be false", sorter.needsRLE());
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   @Test
   public void testWithCombiner() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, DummyCombiner.class.getName());
     this.numOutputs = 5;
@@ -461,6 +461,7 @@ public class TestPipelinedSorter {
     reader.close();
 
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   // for testWithCombiner
@@ -479,7 +480,6 @@ public class TestPipelinedSorter {
 
   @Test
   public void testMultipleSpills_WithRLE() throws IOException {
-    Configuration conf = getConf();
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     this.numOutputs = 5;
     this.initialAvailableMem = 5 * 1024 * 1024;
@@ -491,12 +491,12 @@ public class TestPipelinedSorter {
     writeSimilarKeys(sorter, 25000, 1000, true);
     assertTrue("Expecting needsRLE to be true", sorter.needsRLE());
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   public void basicTest2(int partitions, int[] numkeys, int[] keysize,
       long initialAvailableMem, int  blockSize) throws IOException {
     this.numOutputs = partitions; // single output
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
@@ -520,12 +520,12 @@ public class TestPipelinedSorter {
     }
     sorter.flush();
     sorter.close();
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
   }
 
   public void basicTest(int partitions, int numKeys, int keySize,
       long initialAvailableMem, int minBlockSize) throws IOException {
     this.numOutputs = partitions; // single output
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
@@ -543,6 +543,7 @@ public class TestPipelinedSorter {
     }
 
     verifyCounters(sorter, outputContext);
+    verifyOutputPermissions(outputContext.getUniqueIdentifier());
     Path outputFile = sorter.finalOutputFile;
     FileSystem fs = outputFile.getFileSystem(conf);
     TezCounter finalOutputBytes =
@@ -596,7 +597,6 @@ public class TestPipelinedSorter {
   //Its not possible to allocate > 2 GB in test environment.  Carry out basic checks here.
   public void memTest() throws IOException {
     //Verify if > 2 GB can be set via config
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076);
     long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l);
     Assert.assertTrue(size == (3076l << 20));
@@ -681,7 +681,6 @@ public class TestPipelinedSorter {
   //Intentionally not having timeout
   public void test_without_lazyMemAllocation() throws IOException {
     this.numOutputs = 10;
-    Configuration conf = getConf();
 
     //128 MB. Pre-allocate. Request for default block size. Get 1 buffer
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
@@ -722,7 +721,6 @@ public class TestPipelinedSorter {
   //Intentionally not having timeout
   public void test_with_lazyMemAllocation() throws IOException {
     this.numOutputs = 10;
-    Configuration conf = getConf();
 
     //128 MB. Do not pre-allocate.
     // Get 32 MB buffer first and the another buffer with 96 on filling up
@@ -782,7 +780,6 @@ public class TestPipelinedSorter {
   //Intentionally not having timeout
   public void testLazyAllocateMem() throws IOException {
     this.numOutputs = 10;
-    Configuration conf = getConf();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
     conf.setBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false);
@@ -830,6 +827,17 @@ public class TestPipelinedSorter {
     basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
   }
 
+  private void verifyOutputPermissions(String spillId) throws IOException {
+    String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+        + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+    Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf);
+    Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf);
+    Assert.assertEquals("Incorrect output permissions", (short)0640,
+        localFs.getFileStatus(outputPath).getPermission().toShort());
+    Assert.assertEquals("Incorrect index permissions", (short)0640,
+        localFs.getFileStatus(indexPath).getPermission().toShort());
+  }
+
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
     writeData(sorter, numKeys, keyLen, true);
   }
@@ -880,7 +888,6 @@ public class TestPipelinedSorter {
     Text readValue = new Text();
     DataInputBuffer keyIn = new DataInputBuffer();
     DataInputBuffer valIn = new DataInputBuffer();
-    Configuration conf = getConf();
     SerializationFactory serializationFactory = new SerializationFactory(conf);
     Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class);
     Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 444ebaf..aad232a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.library.common.sort.impl.dflt;
 
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.runtime.library.common.Constants;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -42,6 +44,7 @@ import com.google.protobuf.ByteString;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -84,16 +87,19 @@ import org.mockito.stubbing.Answer;
 
 public class TestDefaultSorter {
 
-  private Configuration conf;
   private static final int PORT = 80;
   private static final String UniqueID = "UUID";
 
   private static FileSystem localFs = null;
   private static Path workingDir = null;
 
+  private Configuration conf;
+  private LocalDirAllocator dirAllocator;
+
   @Before
   public void setup() throws IOException {
     conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); // DefaultSorter
     conf.set("fs.defaultFS", "file:///");
     localFs = FileSystem.getLocal(conf);
@@ -108,6 +114,7 @@ public class TestDefaultSorter {
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
         HashPartitioner.class.getName());
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
   }
 
   @AfterClass
@@ -272,6 +279,8 @@ public class TestDefaultSorter {
     } catch(IOException ioe) {
       fail(ioe.getMessage());
     }
+
+    verifyOutputPermissions(context.getUniqueIdentifier());
   }
 
   @Test(timeout = 30000)
@@ -396,6 +405,7 @@ public class TestDefaultSorter {
       assertTrue(sorter.getNumSpills() == numKeys);
     }
     verifyCounters(sorter, context);
+    verifyOutputPermissions(context.getUniqueIdentifier());
     if (sorter.indexCacheList.size() != 0) {
       for (int i = 0; i < sorter.getNumSpills(); i++) {
         TezSpillRecord record = sorter.indexCacheList.get(i);
@@ -482,6 +492,7 @@ public class TestDefaultSorter {
         ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
             .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
         assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_0"));
+        verifyOutputPermissions(shufflePayload.getPathComponent());
       }
     }
 
@@ -513,6 +524,7 @@ public class TestDefaultSorter {
         ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
             .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
         assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_" + spillIndex));
+        verifyOutputPermissions(shufflePayload.getPathComponent());
         spillIndex++;
       }
     }
@@ -520,6 +532,17 @@ public class TestDefaultSorter {
     verifyCounters(sorter, context);
   }
 
+  private void verifyOutputPermissions(String spillId) throws IOException {
+    String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+        + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+    Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf);
+    Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf);
+    Assert.assertEquals("Incorrect output permissions", (short)0640,
+        localFs.getFileStatus(outputPath).getPermission().toShort());
+    Assert.assertEquals("Incorrect index permissions", (short)0640,
+        localFs.getFileStatus(indexPath).getPermission().toShort());
+  }
+
   private void verifyCounters(DefaultSorter sorter, OutputContext context) {
     TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
     TezCounter additionalSpills = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);

http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index ae396cb..dfd807b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -51,9 +51,11 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
@@ -510,6 +512,10 @@ public class TestUnorderedPartitionedKVWriter {
     if (numRecordsWritten > 0) {
       assertTrue(localFs.exists(outputFilePath));
       assertTrue(localFs.exists(spillFilePath));
+      assertEquals("Incorrect output permissions", (short)0640,
+          localFs.getFileStatus(outputFilePath).getPermission().toShort());
+      assertEquals("Incorrect index permissions", (short)0640,
+          localFs.getFileStatus(spillFilePath).getPermission().toShort());
     } else {
       return;
     }
@@ -794,8 +800,14 @@ public class TestUnorderedPartitionedKVWriter {
     if (numRecordsWritten > 0) {
       int numSpills = kvWriter.numSpills.get();
       for (int i = 0; i < numSpills; i++) {
-        assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10)));
-        assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10)));
+        Path outputFile = taskOutput.getSpillFileForWrite(i, 10);
+        Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10);
+        assertTrue(localFs.exists(outputFile));
+        assertTrue(localFs.exists(indexFile));
+        assertEquals("Incorrect output permissions", (short)0640,
+            localFs.getFileStatus(outputFile).getPermission().toShort());
+        assertEquals("Incorrect index permissions", (short)0640,
+            localFs.getFileStatus(indexFile).getPermission().toShort());
       }
     } else {
       return;
@@ -1042,6 +1054,13 @@ public class TestUnorderedPartitionedKVWriter {
         assertEquals(2, matcher.groupCount());
         assertEquals(uniqueId, matcher.group(1));
         assertTrue("spill id should be present in path component", matcher.group(2) != null);
+        Path outputPath = new Path(outputContext.getWorkDirs()[0],
+            "output/" + eventProto.getPathComponent() + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+        Path indexPath = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+        assertEquals("Incorrect output permissions", (short)0640,
+            localFs.getFileStatus(outputPath).getPermission().toShort());
+        assertEquals("Incorrect index permissions", (short)0640,
+            localFs.getFileStatus(indexPath).getPermission().toShort());
       } else {
         assertEquals(0, eventProto.getSpillId());
         if (outputRecordsCounter.getValue() > 0) {
@@ -1341,6 +1360,7 @@ public class TestUnorderedPartitionedKVWriter {
       boolean shouldCompress, int maxSingleBufferSizeBytes,
       Class<? extends Partitioner> partitionerClass) {
     Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass.getName());