You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/11/26 16:48:08 UTC

[19/39] git commit: ACCUMULO-804 initial attempt to make accumulo binary compatible with 2.0

ACCUMULO-804 initial attempt to make accumulo binary compatible with 2.0

git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/1.5@1483399 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit d6c612d087e7f922c9935888cf443d4a9f1999c2)

Reason: Hadoop2 Compat
Author: Eric C. Newton <ec...@apache.org>
Ref: ACCUMULO-1792

Signed-off-by: Eric Newton <er...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2c83ca33
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2c83ca33
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2c83ca33

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 2c83ca337ac54e3c71aff1596bcf6ea04aea6491
Parents: 1b1334f
Author: Jonathan M Hsieh <jo...@cloudera.com>
Authored: Wed May 29 14:23:01 2013 -0700
Committer: Eric Newton <er...@gmail.com>
Committed: Mon Nov 25 16:06:42 2013 -0500

----------------------------------------------------------------------
 .../accumulo/core/util/TTimeoutTransport.java   | 12 ++++-
 .../org/apache/accumulo/server/Accumulo.java    | 51 ++++++++++++++++++--
 2 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c83ca33/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
index 3c1fa6a..0aebc39 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
@@ -21,6 +21,7 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.channels.spi.SelectorProvider;
@@ -31,12 +32,21 @@ import org.apache.thrift.transport.TTransport;
 
 public class TTimeoutTransport {
   
+  private static InputStream getInputStream(Socket socket, long timeout) {
+    try {
+      Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
+      return (InputStream)m.invoke(null, socket, timeout);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
     Socket socket = SelectorProvider.provider().openSocketChannel().socket();
     socket.setSoLinger(false, 0);
     socket.setTcpNoDelay(true);
     socket.connect(addr);
-    InputStream input = new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10);
+    InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
     OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
     return new TIOStreamTransport(input, output);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c83ca33/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index b2feb5c..32462b7 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map.Entry;
@@ -212,10 +213,7 @@ public class Accumulo {
     while (true) {
       try {
         FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-        if (!(fs instanceof DistributedFileSystem))
-          break;
-        DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance());
-        if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+        if (!isInSafeMode(fs))
           break;
         log.warn("Waiting for the NameNode to leave safemode");
       } catch (IOException ex) {
@@ -227,4 +225,49 @@ public class Accumulo {
     }
     log.info("Connected to HDFS");
   }
+
+  private static boolean isInSafeMode(FileSystem fs) throws IOException {
+    if (!(fs instanceof DistributedFileSystem))
+      return false;
+    DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance());
+    // So this:  if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+    // Becomes this:
+    Class<?> constantClass;
+    try {
+      // hadoop 2.0
+      constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants");
+    } catch (ClassNotFoundException ex) {
+      // hadoop 1.0
+      try {
+        constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants");
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("Cannot figure out the right class for Constants");
+      }
+    }
+    Class<?> safeModeAction = null;
+    for (Class<?> klass : constantClass.getDeclaredClasses()) {
+      if (klass.getSimpleName().equals("SafeModeAction")) {
+        safeModeAction = klass;
+        break;
+      }
+    }
+    if (safeModeAction == null) {
+      throw new RuntimeException("Cannot find SafeModeAction in constants class");
+    }
+    
+    Object get = null;
+    for (Object obj : safeModeAction.getEnumConstants()) {
+      if (obj.toString().equals("SAFEMODE_GET"))
+        get = obj;
+    }
+    if (get == null) {
+      throw new RuntimeException("cannot find SAFEMODE_GET");
+    }
+    try {
+      Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+      return (Boolean)setSafeMode.invoke(dfs, get);
+    } catch (Exception ex) {
+      throw new RuntimeException("cannot find method setSafeMode");
+    }
+  }
 }