You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/02/19 18:52:58 UTC

svn commit: r1569856 - /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java

Author: vinodkv
Date: Wed Feb 19 17:52:57 2014
New Revision: 1569856

URL: http://svn.apache.org/r1569856
Log:
YARN-1666. Modified RM HA handling of include/exclude node-lists to be available across RM failover by making using of a remote configuration-provider. Contributed by Xuan Gong.

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java?rev=1569856&r1=1569855&r2=1569856&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java Wed Feb 19 17:52:57 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 
 // Keeps track of which datanodes/tasktrackers are allowed to connect to the 
@@ -48,13 +49,30 @@ public class HostsFileReader {
     refresh();
   }
 
+  @Private
+  public HostsFileReader(String includesFile, InputStream inFileInputStream,
+      String excludesFile, InputStream exFileInputStream) throws IOException {
+    includes = new HashSet<String>();
+    excludes = new HashSet<String>();
+    this.includesFile = includesFile;
+    this.excludesFile = excludesFile;
+    refresh(inFileInputStream, exFileInputStream);
+  }
+
   public static void readFileToSet(String type,
       String filename, Set<String> set) throws IOException {
     File file = new File(filename);
     FileInputStream fis = new FileInputStream(file);
+    readFileToSetWithFileInputStream(type, filename, fis, set);
+  }
+
+  @Private
+  public static void readFileToSetWithFileInputStream(String type,
+      String filename, InputStream fileInputStream, Set<String> set)
+      throws IOException {
     BufferedReader reader = null;
     try {
-      reader = new BufferedReader(new InputStreamReader(fis));
+      reader = new BufferedReader(new InputStreamReader(fileInputStream));
       String line;
       while ((line = reader.readLine()) != null) {
         String[] nodes = line.split("[ \t\n\f\r]+");
@@ -71,26 +89,63 @@ public class HostsFileReader {
             }
           }
         }
-      }   
+      }
     } finally {
       if (reader != null) {
         reader.close();
       }
-      fis.close();
-    }  
+      fileInputStream.close();
+    }
   }
 
   public synchronized void refresh() throws IOException {
     LOG.info("Refreshing hosts (include/exclude) list");
+    Set<String> newIncludes = new HashSet<String>();
+    Set<String> newExcludes = new HashSet<String>();
+    boolean switchIncludes = false;
+    boolean switchExcludes = false;
     if (!includesFile.isEmpty()) {
-      Set<String> newIncludes = new HashSet<String>();
       readFileToSet("included", includesFile, newIncludes);
-      // switch the new hosts that are to be included
-      includes = newIncludes;
+      switchIncludes = true;
     }
     if (!excludesFile.isEmpty()) {
-      Set<String> newExcludes = new HashSet<String>();
       readFileToSet("excluded", excludesFile, newExcludes);
+      switchExcludes = true;
+    }
+
+    if (switchIncludes) {
+      // switch the new hosts that are to be included
+      includes = newIncludes;
+    }
+    if (switchExcludes) {
+      // switch the excluded hosts
+      excludes = newExcludes;
+    }
+  }
+
+  @Private
+  public synchronized void refresh(InputStream inFileInputStream,
+      InputStream exFileInputStream) throws IOException {
+    LOG.info("Refreshing hosts (include/exclude) list");
+    Set<String> newIncludes = new HashSet<String>();
+    Set<String> newExcludes = new HashSet<String>();
+    boolean switchIncludes = false;
+    boolean switchExcludes = false;
+    if (inFileInputStream != null) {
+      readFileToSetWithFileInputStream("included", includesFile,
+          inFileInputStream, newIncludes);
+      switchIncludes = true;
+    }
+    if (exFileInputStream != null) {
+      readFileToSetWithFileInputStream("excluded", excludesFile,
+          exFileInputStream, newExcludes);
+      switchExcludes = true;
+    }
+    if (switchIncludes) {
+      // switch the new hosts that are to be included
+      includes = newIncludes;
+    }
+    if (switchExcludes) {
       // switch the excluded hosts
       excludes = newExcludes;
     }