You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/04 00:37:42 UTC

[1/9] hadoop git commit: HDFS-8270. create() always retried with hardcoded timeout when file already exists with open lease (Contributed by J.Andreina)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 ae0fac3ef -> 3bd9b7459


HDFS-8270. create() always retried with hardcoded timeout when file already exists with open lease (Contributed by J.Andreina)

(cherry picked from commit 54f83d9bd917e8641e902c5f0695e65ded472f9a)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

(cherry picked from commit 8090a6ee63c414ac5b76c21df1f6b5a188e873d6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

(cherry picked from commit 04a7000c8d955a134d86967f4b4622ea920e1ea7)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a2434dcf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a2434dcf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a2434dcf

Branch: refs/heads/branch-2.6.1
Commit: a2434dcf2a4a6e436d7472ce9d7ee4830808ed2d
Parents: ae0fac3
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Jun 3 12:11:46 2015 +0530
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:06:09 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt          |  3 +++
 .../java/org/apache/hadoop/hdfs/NameNodeProxies.java | 15 ---------------
 .../org/apache/hadoop/hdfs/TestFileCreation.java     |  3 +--
 3 files changed, 4 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2434dcf/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 68adff8..759a20f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -150,6 +150,9 @@ Release 2.6.1 - UNRELEASED
     HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
     (Ming Ma via jing9)
 
+    HDFS-8270. create() always retried with hardcoded timeout when file already
+    exists with open lease (J.Andreina via vinayakumarb)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2434dcf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index b261220..c7e2cf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -42,7 +42,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
@@ -68,7 +67,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
@@ -425,22 +423,9 @@ public class NameNodeProxies {
 
     if (withRetries) { // create the proxy with retries
 
-      RetryPolicy createPolicy = RetryPolicies
-          .retryUpToMaximumCountWithFixedSleep(5,
-              HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-    
-      Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap 
-                 = new HashMap<Class<? extends Exception>, RetryPolicy>();
-      remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
-          createPolicy);
-
-      RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException(
-          defaultPolicy, remoteExceptionToPolicyMap);
       Map<String, RetryPolicy> methodNameToPolicyMap 
                  = new HashMap<String, RetryPolicy>();
     
-      methodNameToPolicyMap.put("create", methodPolicy);
-
       ClientProtocol translatorProxy =
         new ClientNamenodeProtocolTranslatorPB(proxy);
       return (ClientProtocol) RetryProxy.create(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2434dcf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
index 3a399f3..8e88b62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
@@ -408,9 +408,8 @@ public class TestFileCreation {
         GenericTestUtils.assertExceptionContains("already being created by",
             abce);
       }
-      // NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
       assertCounter("AlreadyBeingCreatedExceptionNumOps",
-          6L, getMetrics(metricsName));
+          1L, getMetrics(metricsName));
       FSDataOutputStream stm2 = fs2.create(p, true);
       stm2.write(2);
       stm2.close();


[3/9] hadoop git commit: HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)

Posted by vi...@apache.org.
HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)

(cherry picked from commit 295d678be8853a52c3ec3da43d9265478d6632b3)
(cherry picked from commit 80697e4f324948ec32b4cad3faccba55287be652)
(cherry picked from commit b3546b60340e085c5abd8f8f0990d45c7445fe07)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt

(cherry picked from commit e9c8d8c58516aa64589cd44e9e5dd0a23ba72a17)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f53c98c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f53c98c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f53c98c

Branch: refs/heads/branch-2.6.1
Commit: 4f53c98ca4b6fa4b75935e743df7aae6b54366ce
Parents: 193d8d3
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jun 18 14:39:00 2015 +0530
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:15:23 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../java/org/apache/hadoop/io/SequenceFile.java |  85 ++++-
 .../hadoop/io/TestSequenceFileAppend.java       | 311 +++++++++++++++++++
 3 files changed, 394 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 107a95a..c3d18a1 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -8,6 +8,9 @@ Release 2.6.1 - UNRELEASED
 
   IMPROVEMENTS
 
+    HADOOP-7139. Allow appending to existing SequenceFiles
+    (kanaka kumar avvaru via vinayakumarb)
+
   OPTIMIZATIONS
 
     HADOOP-11238. Update the NameNode's Group Cache in the background when

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 35fc130..153856d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -835,7 +835,9 @@ public class SequenceFile {
     DataOutputStream deflateOut = null;
     Metadata metadata = null;
     Compressor compressor = null;
-    
+
+    private boolean appendMode = false;
+
     protected Serializer keySerializer;
     protected Serializer uncompressedValSerializer;
     protected Serializer compressedValSerializer;
@@ -907,6 +909,13 @@ public class SequenceFile {
       }
     }
 
+    static class AppendIfExistsOption extends Options.BooleanOption implements
+        Option {
+      AppendIfExistsOption(boolean value) {
+        super(value);
+      }
+    }
+
     static class KeyClassOption extends Options.ClassOption implements Option {
       KeyClassOption(Class<?> value) {
         super(value);
@@ -956,7 +965,7 @@ public class SequenceFile {
         return codec;
       }
     }
-    
+
     public static Option file(Path value) {
       return new FileOption(value);
     }
@@ -982,6 +991,10 @@ public class SequenceFile {
       return new ReplicationOption(value);
     }
     
+    public static Option appendIfExists(boolean value) {
+      return new AppendIfExistsOption(value);
+    }
+
     public static Option blockSize(long value) {
       return new BlockSizeOption(value);
     }
@@ -1028,6 +1041,8 @@ public class SequenceFile {
       ProgressableOption progressOption = 
         Options.getOption(ProgressableOption.class, opts);
       FileOption fileOption = Options.getOption(FileOption.class, opts);
+      AppendIfExistsOption appendIfExistsOption = Options.getOption(
+          AppendIfExistsOption.class, opts);
       FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
       StreamOption streamOption = Options.getOption(StreamOption.class, opts);
       KeyClassOption keyClassOption = 
@@ -1069,7 +1084,54 @@ public class SequenceFile {
           blockSizeOption.getValue();
         Progressable progress = progressOption == null ? null :
           progressOption.getValue();
-        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
+
+        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
+            && fs.exists(p)) {
+
+          // Read the file and verify header details
+          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
+          try {
+
+            if (keyClassOption.getValue() != reader.getKeyClass()
+                || valueClassOption.getValue() != reader.getValueClass()) {
+              throw new IllegalArgumentException(
+                  "Key/value class provided does not match the file");
+            }
+
+            if (reader.getVersion() != VERSION[3]) {
+              throw new VersionMismatchException(VERSION[3],
+                  reader.getVersion());
+            }
+
+            if (metadataOption != null) {
+              LOG.info("MetaData Option is ignored during append");
+            }
+            metadataOption = (MetadataOption) SequenceFile.Writer
+                .metadata(reader.getMetadata());
+
+            CompressionOption readerCompressionOption = new CompressionOption(
+                reader.getCompressionType(), reader.getCompressionCodec());
+
+            if (readerCompressionOption.value != compressionTypeOption.value
+                || !readerCompressionOption.codec.getClass().getName()
+                    .equals(compressionTypeOption.codec.getClass().getName())) {
+              throw new IllegalArgumentException(
+                  "Compression option provided does not match the file");
+            }
+
+            sync = reader.getSync();
+
+          } finally {
+            reader.close();
+          }
+
+          out = fs.append(p, bufferSize, progress);
+          this.appendMode = true;
+        } else {
+          out = fs
+              .create(p, true, bufferSize, replication, blockSize, progress);
+        }
       } else {
         out = streamOption.getValue();
       }
@@ -1157,7 +1219,7 @@ public class SequenceFile {
       out.write(sync);                       // write the sync bytes
       out.flush();                           // flush header
     }
-    
+
     /** Initialize. */
     @SuppressWarnings("unchecked")
     void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
@@ -1212,7 +1274,12 @@ public class SequenceFile {
         }
         this.compressedValSerializer.open(deflateOut);
       }
-      writeFileHeader();
+
+      if (appendMode) {
+        sync();
+      } else {
+        writeFileHeader();
+      }
     }
     
     /** Returns the class of keys in this file. */
@@ -2043,6 +2110,14 @@ public class SequenceFile {
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
     
+    private byte[] getSync() {
+      return sync;
+    }
+
+    private byte getVersion() {
+      return version;
+    }
+
     /**
      * Get the compression type for this file.
      * @return the compression type

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
new file mode 100644
index 0000000..4576642
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSequenceFileAppend {
+
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path ROOT_PATH = new Path(System.getProperty(
+      "test.build.data", "build/test/data"));
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    fs.close();
+  }
+
+  @Test(timeout = 30000)
+  public void testAppend() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappend.seq");
+    fs.delete(file, true);
+
+    Text key1 = new Text("Key1");
+    Text value1 = new Text("Value1");
+    Text value2 = new Text("Updated");
+
+    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+    metadata.set(key1, value1);
+    Writer.Option metadataOption = Writer.metadata(metadata);
+
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), metadataOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    metadata.set(key1, value2);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), metadataOption);
+
+    // Verify the Meta data is not changed
+    assertEquals(value1, writer.metadata.get(key1));
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+
+    writer.close();
+
+    verifyAll4Values(file);
+
+    // Verify the Meta data readable after append
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(value1, reader.getMetadata().get(key1));
+    reader.close();
+
+    // Verify failure if the compression details are different
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+          new GzipCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+          new DefaultCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendRecordCompression() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+    fs.delete(file, true);
+
+    Option compressOption = Writer.compression(CompressionType.RECORD,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+    writer.close();
+
+    verifyAll4Values(file);
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendBlockCompression() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+    fs.delete(file, true);
+
+    Option compressOption = Writer.compression(CompressionType.BLOCK,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+    writer.close();
+
+    verifyAll4Values(file);
+
+    // Verify failure if the compression details are different or not Provided
+    try {
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true));
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    // Verify failure if the compression details are different
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+          new GzipCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+          new DefaultCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendSort() throws Exception {
+    Path file = new Path(ROOT_PATH, "testseqappendSort.seq");
+    fs.delete(file, true);
+
+    Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort");
+    fs.delete(sortedFile, true);
+
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+        new JavaSerializationComparator<Long>(), Long.class, String.class, conf);
+
+    Option compressOption = Writer.compression(CompressionType.BLOCK,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(2L, "two");
+    writer.append(1L, "one");
+
+    writer.close();
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(4L, "four");
+    writer.append(3L, "three");
+    writer.close();
+
+    // Sort file after append
+    sorter.sort(file, sortedFile);
+    verifyAll4Values(sortedFile);
+
+    fs.deleteOnExit(file);
+    fs.deleteOnExit(sortedFile);
+  }
+
+  private void verify2Values(Path file) throws IOException {
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(1L, reader.next((Object) null));
+    assertEquals("one", reader.getCurrentValue((Object) null));
+    assertEquals(2L, reader.next((Object) null));
+    assertEquals("two", reader.getCurrentValue((Object) null));
+    assertNull(reader.next((Object) null));
+    reader.close();
+  }
+
+  private void verifyAll4Values(Path file) throws IOException {
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(1L, reader.next((Object) null));
+    assertEquals("one", reader.getCurrentValue((Object) null));
+    assertEquals(2L, reader.next((Object) null));
+    assertEquals("two", reader.getCurrentValue((Object) null));
+    assertEquals(3L, reader.next((Object) null));
+    assertEquals("three", reader.getCurrentValue((Object) null));
+    assertEquals(4L, reader.next((Object) null));
+    assertEquals("four", reader.getCurrentValue((Object) null));
+    assertNull(reader.next((Object) null));
+    reader.close();
+  }
+}


[2/9] hadoop git commit: YARN-3585. NodeManager cannot exit on SHUTDOWN event triggered and NM recovery is enabled. Contributed by Rohith Sharmaks (cherry picked from commit e13b671aa510f553f4a6a232b4694b6a4cce88ae)

Posted by vi...@apache.org.
YARN-3585. NodeManager cannot exit on SHUTDOWN event triggered and NM recovery is enabled. Contributed by Rohith Sharmaks
(cherry picked from commit e13b671aa510f553f4a6a232b4694b6a4cce88ae)

(cherry picked from commit 752caa95a40d899e1bf98bc907e91aec2bb57073)
(cherry picked from commit 13c4db632b0e7f19dcfa883c2492431c2c7d0799)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/193d8d36
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/193d8d36
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/193d8d36

Branch: refs/heads/branch-2.6.1
Commit: 193d8d3667c1a6bfe024e9d02fdabc0d7638e7f7
Parents: a2434dc
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jun 3 19:44:07 2015 +0000
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:09:16 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                    |  3 +++
 .../yarn/server/nodemanager/NodeManager.java       | 17 +++++++++++++++--
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d8d36/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4ce9f9f..6b46619 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -153,6 +153,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3725. App submission via REST API is broken in secure mode due to
     Timeline DT service address is empty. (Zhijie Shen via wangda)
 
+    YARN-3585. NodeManager cannot exit on SHUTDOWN event triggered and NM
+    recovery is enabled (Rohith Sharmaks via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/193d8d36/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 063ae87..0ec86d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -88,6 +89,7 @@ public class NodeManager extends CompositeService
   
   private AtomicBoolean isStopping = new AtomicBoolean(false);
   private boolean rmWorkPreservingRestartEnabled;
+  private boolean shouldExitOnShutdownEvent = false;
 
   public NodeManager() {
     super(NodeManager.class.getName());
@@ -287,7 +289,16 @@ public class NodeManager extends CompositeService
     new Thread() {
       @Override
       public void run() {
-        NodeManager.this.stop();
+        try {
+          NodeManager.this.stop();
+        } catch (Throwable t) {
+          LOG.error("Error while shutting down NodeManager", t);
+        } finally {
+          if (shouldExitOnShutdownEvent
+              && !ShutdownHookManager.get().isShutdownInProgress()) {
+            ExitUtil.terminate(-1);
+          }
+        }
       }
     }.start();
   }
@@ -463,7 +474,9 @@ public class NodeManager extends CompositeService
       nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
       ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
                                                 SHUTDOWN_HOOK_PRIORITY);
-
+      // System exit should be called only when NodeManager is instantiated from
+      // main() funtion
+      this.shouldExitOnShutdownEvent = true;
       this.init(conf);
       this.start();
     } catch (Throwable t) {


[8/9] hadoop git commit: HADOOP-11932. MetricsSinkAdapter may hang when being stopped. Contributed by Brahma Reddy Battula (cherry picked from commit f59612edd74d1bef2b60870c24c1f67c56b2b3cb)

Posted by vi...@apache.org.
HADOOP-11932. MetricsSinkAdapter may hang  when being stopped. Contributed by Brahma Reddy Battula
(cherry picked from commit f59612edd74d1bef2b60870c24c1f67c56b2b3cb)

(cherry picked from commit 5950c1f6f8ac6f514f8d2e8bfbd1f71747b097de)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8ddfea4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8ddfea4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8ddfea4

Branch: refs/heads/branch-2.6.1
Commit: d8ddfea491a7ccf5c074eb339f06015186c0d2b0
Parents: ca7fe71
Author: Jian He <ji...@apache.org>
Authored: Wed Aug 5 16:12:45 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:45:04 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../metrics2/impl/MetricsSinkAdapter.java       |  6 +-
 .../metrics2/impl/TestMetricsSystemImpl.java    | 60 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index c222b8e..57c7eae 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -69,6 +69,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-8151. Error handling in snappy decompressor throws invalid
     exceptions. (Matt Foley via harsh)
 
+    HADOOP-11932. MetricsSinkAdapter may hang  when being stopped.
+    (Brahma Reddy Battula via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index de39a13..da24b8e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -198,15 +198,15 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
   void stop() {
     stopping = true;
     sinkThread.interrupt();
+    if (sink instanceof Closeable) {
+      IOUtils.cleanup(LOG, (Closeable)sink);
+    }
     try {
       sinkThread.join();
     }
     catch (InterruptedException e) {
       LOG.warn("Stop interrupted", e);
     }
-    if (sink instanceof Closeable) {
-      IOUtils.cleanup(LOG, (Closeable)sink);
-    }
   }
 
   String name() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index d59e80b..b5ebb93 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.metrics2.impl;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -434,6 +436,64 @@ public class TestMetricsSystemImpl {
                new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
   }
 
+  /**
+   * Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
+   * until closed.
+   */
+  private static class TestClosableSink implements MetricsSink, Closeable {
+
+    boolean closed = false;
+    CountDownLatch collectingLatch;
+
+    public TestClosableSink(CountDownLatch collectingLatch) {
+      this.collectingLatch = collectingLatch;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      while (!closed) {
+        collectingLatch.countDown();
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
+  /**
+   * HADOOP-11932
+   */
+  @Test(timeout = 5000)
+  public void testHangOnSinkRead() throws Exception {
+    new ConfigBuilder().add("*.period", 8)
+        .add("test.sink.test.class", TestSink.class.getName())
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    try {
+      CountDownLatch collectingLatch = new CountDownLatch(1);
+      MetricsSink sink = new TestClosableSink(collectingLatch);
+      ms.registerSink("closeableSink",
+          "The sink will be used to test closeability", sink);
+      // trigger metric collection first time
+      ms.onTimerEvent();
+      // Make sure that sink is collecting metrics
+      assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
+    } finally {
+      ms.stop();
+    }
+  }
+
   @Metrics(context="test")
   private static class TestSource {
     @Metric("C1 desc") MutableCounterLong c1;


[4/9] hadoop git commit: YARN-3832. Resource Localization fails on a cluster due to existing cache directories. Contributed by Brahma Reddy Battula (cherry picked from commit 8d58512d6e6d9fe93784a9de2af0056bcc316d96)

Posted by vi...@apache.org.
YARN-3832. Resource Localization fails on a cluster due to existing cache directories. Contributed by Brahma Reddy Battula
(cherry picked from commit 8d58512d6e6d9fe93784a9de2af0056bcc316d96)

(cherry picked from commit 15b1800b1289d239cbebc5cfd66cfe156d45a2d3)
(cherry picked from commit 38400507e3352d83c2a1f364de137366249b7983)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f21fb808
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f21fb808
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f21fb808

Branch: refs/heads/branch-2.6.1
Commit: f21fb808f1fd919894d0b25d65cee4730d3a6c00
Parents: 4f53c98
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jun 24 16:37:39 2015 +0000
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:26:43 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                                   | 3 +++
 .../containermanager/localizer/ResourceLocalizationService.java   | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f21fb808/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6b46619..1f4d851 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3585. NodeManager cannot exit on SHUTDOWN event triggered and NM
     recovery is enabled (Rohith Sharmaks via jlowe)
 
+    YARN-3832. Resource Localization fails on a cluster due to existing cache
+    directories (Brahma Reddy Battula via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f21fb808/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index d38bb7a..2a4d350 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1271,7 +1271,7 @@ public class ResourceLocalizationService extends CompositeService
   }
 
   private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
-    for (String localDir : dirsHandler.getLocalDirs()) {
+    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
       cleanUpLocalDir(lfs, del, localDir);
     }
   }


[6/9] hadoop git commit: YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena (cherry picked from commit 40b256949ad6f6e0dbdd248f2d257b05899f4332)

Posted by vi...@apache.org.
YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena
(cherry picked from commit 40b256949ad6f6e0dbdd248f2d257b05899f4332)

(cherry picked from commit 0221d19f4e398c386f4ca3990b0893562aa8dacf)
(cherry picked from commit 87d2204f28f192a964c04a5fa1e2e31644d74b59)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe5877a4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe5877a4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe5877a4

Branch: refs/heads/branch-2.6.1
Commit: fe5877a49e9a8c387a5be77edd1eb1448184271e
Parents: 34739fc
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jun 26 15:47:07 2015 +0000
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:35:01 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../nodemanager/LocalDirsHandlerService.java    | 24 +++++++++
 .../launcher/RecoveredContainerLaunch.java      |  3 +-
 .../logaggregation/AppLogAggregatorImpl.java    |  4 +-
 .../nodemanager/webapp/ContainerLogsUtils.java  |  2 +-
 .../TestLogAggregationService.java              | 54 +++++++++++++++-----
 .../webapp/TestContainerLogsPage.java           | 22 +++++++-
 7 files changed, 93 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1f4d851..98dc354 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -159,6 +159,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3832. Resource Localization fails on a cluster due to existing cache
     directories (Brahma Reddy Battula via jlowe)
 
+    YARN-3850. NM fails to read files from full disks which can lead to
+    container logs being lost and other issues (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index 7d1aa53..d6950a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -213,6 +213,18 @@ public class LocalDirsHandlerService extends AbstractService {
   }
 
   /**
+   * Function to get the local dirs which should be considered for reading
+   * existing files on disk. Contains the good local dirs and the local dirs
+   * that have reached the disk space limit
+   *
+   * @return the local dirs which should be considered for reading
+   */
+  public List<String> getLocalDirsForRead() {
+    return DirectoryCollection.concat(localDirs.getGoodDirs(),
+        localDirs.getFullDirs());
+  }
+
+  /**
    * Function to get the local dirs which should be considered when cleaning up
    * resources. Contains the good local dirs and the local dirs that have reached
    * the disk space limit
@@ -225,6 +237,18 @@ public class LocalDirsHandlerService extends AbstractService {
   }
 
   /**
+   * Function to get the log dirs which should be considered for reading
+   * existing files on disk. Contains the good log dirs and the log dirs that
+   * have reached the disk space limit
+   *
+   * @return the log dirs which should be considered for reading
+   */
+  public List<String> getLogDirsForRead() {
+    return DirectoryCollection.concat(logDirs.getGoodDirs(),
+        logDirs.getFullDirs());
+  }
+
+  /**
    * Function to get the log dirs which should be considered when cleaning up
    * resources. Contains the good log dirs and the log dirs that have reached
    * the disk space limit

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 03a39aa..c662ecd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -121,7 +121,8 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
 
   private File locatePidFile(String appIdStr, String containerIdStr) {
     String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
-    for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
+    for (String dir : getContext().getLocalDirsHandler().
+        getLocalDirsForRead()) {
       File pidFile = new File(dir, pidSubpath);
       if (pidFile.exists()) {
         return pidFile;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 20887b6..65c5501 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -524,10 +524,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     public Set<Path> doContainerLogAggregation(LogWriter writer) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
-          + StringUtils.join(",", dirsHandler.getLogDirs()));
+          + StringUtils.join(",", dirsHandler.getLogDirsForRead()));
       final LogKey logKey = new LogKey(containerId);
       final LogValue logValue =
-          new LogValue(dirsHandler.getLogDirs(), containerId,
+          new LogValue(dirsHandler.getLogDirsForRead(), containerId,
             userUgi.getShortUserName(), logAggregationContext,
             this.uploadedFileMeta);
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
index c588a89..319f49b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
@@ -74,7 +74,7 @@ public class ContainerLogsUtils {
   
   static List<File> getContainerLogDirs(ContainerId containerId,
       LocalDirsHandlerService dirsHandler) throws YarnException {
-    List<String> logDirs = dirsHandler.getLogDirs();
+    List<String> logDirs = dirsHandler.getLogDirsForRead();
     List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
     for (String logDir : logDirs) {
       logDir = new File(logDir).toURI().getPath();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 7d911e9..e2c45db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -172,22 +172,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     dispatcher.close();
   }
 
-  @Test
-  public void testLocalFileDeletionAfterUpload() throws Exception {
-    this.delSrvc = new DeletionService(createContainerExecutor());
-    delSrvc = spy(delSrvc);
-    this.delSrvc.init(conf);
-    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
-    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-        this.remoteRootLogDir.getAbsolutePath());
-    
-    LogAggregationService logAggregationService = spy(
-        new LogAggregationService(dispatcher, this.context, this.delSrvc,
-                                  super.dirsHandler));
+  private void verifyLocalFileDeletion(
+      LogAggregationService logAggregationService) throws Exception {
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
-    
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
@@ -247,11 +236,48 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
             ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
     };
 
-    checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+    checkEvents(appEventHandler, expectedEvents, true, "getType",
+        "getApplicationID");
     dispatcher.stop();
   }
 
   @Test
+  public void testLocalFileDeletionAfterUpload() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+                                  super.dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
+  @Test
+  public void testLocalFileDeletionOnDiskFull() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    List<String> logDirs = super.dirsHandler.getLogDirs();
+    LocalDirsHandlerService dirsHandler = spy(super.dirsHandler);
+    // Simulate disk being full by returning no good log dirs but having a
+    // directory in full log dirs.
+    when(dirsHandler.getLogDirs()).thenReturn(new ArrayList<String>());
+    when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs);
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+            dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
+
+  @Test
   public void testNoContainerOnNode() throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5877a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
index b1d4397..39c52d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 
@@ -29,6 +30,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,6 +117,24 @@ public class TestContainerLogsPage {
     Assert.assertNull(nmContext.getContainers().get(container1));
     files = ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
     Assert.assertTrue(!(files.get(0).toString().contains("file:")));
+
+    // Create a new context to check if correct container log dirs are fetched
+    // on full disk.
+    LocalDirsHandlerService dirsHandlerForFullDisk = spy(dirsHandler);
+    // good log dirs are empty and nm log dir is in the full log dir list.
+    when(dirsHandlerForFullDisk.getLogDirs()).
+        thenReturn(new ArrayList<String>());
+    when(dirsHandlerForFullDisk.getLogDirsForRead()).
+        thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
+    nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
+        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+    nmContext.getApplications().put(appId, app);
+    container.setState(ContainerState.RUNNING);
+    nmContext.getContainers().put(container1, container);
+    List<File> dirs =
+        ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
+    File containerLogDir = new File(absLogDir, appId + "/" + container1);
+    Assert.assertTrue(dirs.contains(containerLogDir));
   }
   
   @Test(timeout = 10000)
@@ -224,7 +244,7 @@ public class TestContainerLogsPage {
     LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
     List<String> logDirs = new ArrayList<String>();
     logDirs.add("F:/nmlogs");
-    when(localDirs.getLogDirs()).thenReturn(logDirs);
+    when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
     
     ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
     when(appId.toString()).thenReturn("app_id_1");


[7/9] hadoop git commit: YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when Node is connected/disconnected. Contributed by Bibin A Chundatt (cherry picked from commit 32e490b6c035487e99df30ce80366446fe09bd6c)

Posted by vi...@apache.org.
YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when Node is connected/disconnected. Contributed by Bibin A Chundatt
(cherry picked from commit 32e490b6c035487e99df30ce80366446fe09bd6c)

(cherry picked from commit c31e3ba92132f232bd56b257f3854ffe430fbab9)
(cherry picked from commit 07d31d4c0808a169f4770187d655f38aa105255c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca7fe710
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca7fe710
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca7fe710

Branch: refs/heads/branch-2.6.1
Commit: ca7fe710007462cf9c76bb6146ac8915000fdae7
Parents: fe5877a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jul 31 17:37:24 2015 +0000
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:40:20 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/NodesListManager.java       |  28 ++--
 .../rmapp/TestNodesListManager.java             | 162 +++++++++++++++++++
 3 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca7fe710/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 98dc354..60ae3d0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -162,6 +162,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3850. NM fails to read files from full disks which can lead to
     container logs being lost and other issues (Varun Saxena via jlowe)
 
+    YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
+    Node is connected/disconnected (Bibin A Chundatt via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca7fe710/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 90d7b51..a9c064c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -160,12 +160,14 @@ public class NodesListManager extends AbstractService implements
       LOG.debug(eventNode + " reported unusable");
       unusableRMNodesConcurrentSet.add(eventNode);
       for(RMApp app: rmContext.getRMApps().values()) {
-        this.rmContext
-            .getDispatcher()
-            .getEventHandler()
-            .handle(
-                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                    RMAppNodeUpdateType.NODE_UNUSABLE));
+        if (!app.isAppFinalStateStored()) {
+          this.rmContext
+              .getDispatcher()
+              .getEventHandler()
+              .handle(
+                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                      RMAppNodeUpdateType.NODE_UNUSABLE));
+        }
       }
       break;
     case NODE_USABLE:
@@ -174,12 +176,14 @@ public class NodesListManager extends AbstractService implements
         unusableRMNodesConcurrentSet.remove(eventNode);
       }
       for (RMApp app : rmContext.getRMApps().values()) {
-        this.rmContext
-            .getDispatcher()
-            .getEventHandler()
-            .handle(
-                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                    RMAppNodeUpdateType.NODE_USABLE));
+        if (!app.isAppFinalStateStored()) {
+          this.rmContext
+              .getDispatcher()
+              .getEventHandler()
+              .handle(
+                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                      RMAppNodeUpdateType.NODE_USABLE));
+        }
       }
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca7fe710/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
new file mode 100644
index 0000000..5330976
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestNodesListManager {
+  // To hold list of application for which event was received
+  ArrayList<ApplicationId> applist = new ArrayList<ApplicationId>();
+
+  @Test(timeout = 300000)
+  public void testNodeUsableEvent() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    final Dispatcher dispatcher = getDispatcher();
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 28000);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+    Resource clusterResource = Resource.newInstance(28000, 8);
+    RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
+
+    // Create killing APP
+    RMApp killrmApp = rm.submitApp(200);
+    rm.killApp(killrmApp.getApplicationId());
+    rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED);
+
+    // Create finish APP
+    RMApp finshrmApp = rm.submitApp(2000);
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    am.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+
+    // Create submitted App
+    RMApp subrmApp = rm.submitApp(200);
+
+    // Fire Event for NODE_USABLE
+    nodesListManager.handle(new NodesListManagerEvent(
+        NodesListManagerEventType.NODE_USABLE, rmnode));
+    if (applist.size() > 0) {
+      Assert.assertTrue(
+          "Event based on running app expected " + subrmApp.getApplicationId(),
+          applist.contains(subrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on finish app not expected "
+              + finshrmApp.getApplicationId(),
+          applist.contains(finshrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on killed app not expected "
+              + killrmApp.getApplicationId(),
+          applist.contains(killrmApp.getApplicationId()));
+    } else {
+      Assert.fail("Events received should have beeen more than 1");
+    }
+    applist.clear();
+
+    // Fire Event for NODE_UNUSABLE
+    nodesListManager.handle(new NodesListManagerEvent(
+        NodesListManagerEventType.NODE_UNUSABLE, rmnode));
+    if (applist.size() > 0) {
+      Assert.assertTrue(
+          "Event based on running app expected " + subrmApp.getApplicationId(),
+          applist.contains(subrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on finish app not expected "
+              + finshrmApp.getApplicationId(),
+          applist.contains(finshrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on killed app not expected "
+              + killrmApp.getApplicationId(),
+          applist.contains(killrmApp.getApplicationId()));
+    } else {
+      Assert.fail("Events received should have beeen more than 1");
+    }
+
+  }
+
+  /*
+   * Create dispatcher object
+   */
+  private Dispatcher getDispatcher() {
+    Dispatcher dispatcher = new AsyncDispatcher() {
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppNodeUpdateEvent) {
+              ApplicationId appid =
+                  ((RMAppNodeUpdateEvent) argument).getApplicationId();
+              applist.add(appid);
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+    return dispatcher;
+  }
+
+}


[5/9] hadoop git commit: HADOOP-8151. Error handling in snappy decompressor throws invalid exceptions. Contributed by Matt Foley. (harsh)

Posted by vi...@apache.org.
HADOOP-8151. Error handling in snappy decompressor throws invalid exceptions. Contributed by Matt Foley. (harsh)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1389006 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit ac31d6a4485d7ff9074fb5dade7a6cf5292bb347)

Conflicts:

	hadoop-common-project/hadoop-common/CHANGES.txt

(cherry picked from commit 55427fb66c6d52ce98b4d68a29b592a734014c28)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34739fc9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34739fc9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34739fc9

Branch: refs/heads/branch-2.6.1
Commit: 34739fc91e10caebd61ed81c7397183983f431f3
Parents: f21fb80
Author: Harsh J <ha...@apache.org>
Authored: Sun Sep 23 10:37:52 2012 +0000
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:30:35 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt                | 3 +++
 .../src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c      | 2 +-
 .../src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c    | 2 +-
 .../org/apache/hadoop/io/compress/snappy/SnappyCompressor.c    | 4 ++--
 .../org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c  | 6 +++---
 5 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/34739fc9/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index c3d18a1..c222b8e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -66,6 +66,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-11491. HarFs incorrectly declared as requiring an authority.
     (Brahma Reddy Battula via gera)
 
+    HADOOP-8151. Error handling in snappy decompressor throws invalid
+    exceptions. (Matt Foley via harsh)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34739fc9/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
index 58544f5..9f14312 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
@@ -83,7 +83,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp
 
   compressed_direct_buf_len = LZ4_compress(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len);
   if (compressed_direct_buf_len < 0){
-    THROW(env, "Ljava/lang/InternalError", "LZ4_compress failed");
+    THROW(env, "java/lang/InternalError", "LZ4_compress failed");
   }
 
   (*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34739fc9/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
index 6570303..2b8c91c 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
@@ -80,7 +80,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_de
 
   uncompressed_direct_buf_len = LZ4_decompress_safe(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
   if (uncompressed_direct_buf_len < 0) {
-    THROW(env, "Ljava/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
+    THROW(env, "java/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
   }
 
   (*env)->SetIntField(env, thisj, Lz4Decompressor_compressedDirectBufLen, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34739fc9/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c
index 65c978b..fe827f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c
@@ -134,11 +134,11 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso
   ret = dlsym_snappy_compress(uncompressed_bytes, uncompressed_direct_buf_len,
         compressed_bytes, &buf_len);
   if (ret != SNAPPY_OK){
-    THROW(env, "Ljava/lang/InternalError", "Could not compress data. Buffer length is too small.");
+    THROW(env, "java/lang/InternalError", "Could not compress data. Buffer length is too small.");
     return 0;
   }
   if (buf_len > JINT_MAX) {
-    THROW(env, "Ljava/lang/InternalError", "Invalid return buffer length.");
+    THROW(env, "java/lang/InternalError", "Invalid return buffer length.");
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/34739fc9/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c
index 022f2b0..d1fd13c 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c
@@ -126,11 +126,11 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres
   ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len,
         uncompressed_bytes, &uncompressed_direct_buf_len);
   if (ret == SNAPPY_BUFFER_TOO_SMALL){
-    THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Buffer length is too small.");
+    THROW(env, "java/lang/InternalError", "Could not decompress data. Buffer length is too small.");
   } else if (ret == SNAPPY_INVALID_INPUT){
-    THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Input is invalid.");
+    THROW(env, "java/lang/InternalError", "Could not decompress data. Input is invalid.");
   } else if (ret != SNAPPY_OK){
-    THROW(env, "Ljava/lang/InternalError", "Could not decompress data.");
+    THROW(env, "java/lang/InternalError", "Could not decompress data.");
   }
 
   (*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0);


[9/9] hadoop git commit: HADOOP-12280. Skip unit tests based on maven profile rather than NativeCodeLoader.isNativeCodeLoaded (Masatake Iwasaki via Colin P. McCabe)

Posted by vi...@apache.org.
HADOOP-12280. Skip unit tests based on maven profile rather than NativeCodeLoader.isNativeCodeLoaded (Masatake Iwasaki via Colin P. McCabe)

(cherry picked from commit 6f83274afc1eba1159427684d72d8f13778c5a88)
(cherry picked from commit e92107b18f82b3501deaa6170d322a0fb512ec71)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bd9b745
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bd9b745
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bd9b745

Branch: refs/heads/branch-2.6.1
Commit: 3bd9b7459bfe2e4d81d60498832dc297cd01e003
Parents: d8ddfea
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Aug 4 13:51:04 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 15:02:33 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt          |  3 +++
 .../java/org/apache/hadoop/crypto/TestCryptoCodec.java   | 11 +++--------
 .../TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java   |  2 ++
 .../org/apache/hadoop/io/TestSequenceFileAppend.java     |  5 +++++
 .../java/org/apache/hadoop/test/GenericTestUtils.java    | 11 +++++++++++
 5 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd9b745/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 57c7eae..c4735a0 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -11,6 +11,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-7139. Allow appending to existing SequenceFiles
     (kanaka kumar avvaru via vinayakumarb)
 
+    HADOOP-12280. Skip unit tests based on maven profile rather than
+    NativeCodeLoader.isNativeCodeLoaded (Masatake Iwasaki via Colin P. McCabe)
+
   OPTIMIZATIONS
 
     HADOOP-11238. Update the NameNode's Group Cache in the background when

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd9b745/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoCodec.java
index 08231f9..e13df57 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoCodec.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Assert;
@@ -63,10 +64,7 @@ public class TestCryptoCodec {
   
   @Test(timeout=120000)
   public void testJceAesCtrCryptoCodec() throws Exception {
-    if (!"true".equalsIgnoreCase(System.getProperty("runningWithNative"))) {
-      LOG.warn("Skipping since test was not run with -Pnative flag");
-      Assume.assumeTrue(false);
-    }
+    GenericTestUtils.assumeInNativeProfile();
     if (!NativeCodeLoader.buildSupportsOpenssl()) {
       LOG.warn("Skipping test since openSSL library not loaded");
       Assume.assumeTrue(false);
@@ -79,10 +77,7 @@ public class TestCryptoCodec {
   
   @Test(timeout=120000)
   public void testOpensslAesCtrCryptoCodec() throws Exception {
-    if (!"true".equalsIgnoreCase(System.getProperty("runningWithNative"))) {
-      LOG.warn("Skipping since test was not run with -Pnative flag");
-      Assume.assumeTrue(false);
-    }
+    GenericTestUtils.assumeInNativeProfile();
     if (!NativeCodeLoader.buildSupportsOpenssl()) {
       LOG.warn("Skipping test since openSSL library not loaded");
       Assume.assumeTrue(false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd9b745/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java
index f64e8dc..257ad5d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsWithOpensslAesCtrCryptoCodec.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.crypto;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.BeforeClass;
 
 public class TestCryptoStreamsWithOpensslAesCtrCryptoCodec 
@@ -25,6 +26,7 @@ public class TestCryptoStreamsWithOpensslAesCtrCryptoCodec
   
   @BeforeClass
   public static void init() throws Exception {
+    GenericTestUtils.assumeInNativeProfile();
     Configuration conf = new Configuration();
     codec = CryptoCodec.getInstance(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd9b745/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
index 4576642..be4ab92 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.SequenceFile.Writer.Option;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -140,6 +141,7 @@ public class TestSequenceFileAppend {
 
   @Test(timeout = 30000)
   public void testAppendRecordCompression() throws Exception {
+    GenericTestUtils.assumeInNativeProfile();
 
     Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
     fs.delete(file, true);
@@ -173,6 +175,7 @@ public class TestSequenceFileAppend {
 
   @Test(timeout = 30000)
   public void testAppendBlockCompression() throws Exception {
+    GenericTestUtils.assumeInNativeProfile();
 
     Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
     fs.delete(file, true);
@@ -247,6 +250,8 @@ public class TestSequenceFileAppend {
 
   @Test(timeout = 30000)
   public void testAppendSort() throws Exception {
+    GenericTestUtils.assumeInNativeProfile();
+
     Path file = new Path(ROOT_PATH, "testseqappendSort.seq");
     fs.delete(file, true);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd9b745/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 7be71e9..0616887 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -41,6 +41,7 @@ import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.WriterAppender;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -368,4 +369,14 @@ public abstract class GenericTestUtils {
       }
     }
   }
+
+  /**
+   * Skip test if native build profile of Maven is not activated.
+   * Sub-project using this must set 'runningWithNative' property to true
+   * in the definition of native profile in pom.xml.
+   */
+  public static void assumeInNativeProfile() {
+    Assume.assumeTrue(
+        Boolean.valueOf(System.getProperty("runningWithNative", "false")));
+  }
 }