You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/16 17:03:33 UTC
svn commit: r1483399 - in /accumulo/branches/1.5:
core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
server/src/main/java/org/apache/accumulo/server/Accumulo.java
Author: ecn
Date: Thu May 16 15:03:32 2013
New Revision: 1483399
URL: http://svn.apache.org/r1483399
Log:
ACCUMULO-804 initial attempt to make accumulo binary compatible with 2.0
Modified:
accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java
Modified: accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java?rev=1483399&r1=1483398&r2=1483399&view=diff
==============================================================================
--- accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java (original)
+++ accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java Thu May 16 15:03:32 2013
@@ -21,6 +21,7 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.spi.SelectorProvider;
@@ -31,12 +32,21 @@ import org.apache.thrift.transport.TTran
public class TTimeoutTransport {
+ private static InputStream getInputStream(Socket socket, long timeout) {
+ try {
+ Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
+ return (InputStream)m.invoke(null, socket, timeout);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
Socket socket = SelectorProvider.provider().openSocketChannel().socket();
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.connect(addr);
- InputStream input = new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10);
+ InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
return new TIOStreamTransport(input, output);
}
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1483399&r1=1483398&r2=1483399&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java Thu May 16 15:03:32 2013
@@ -20,6 +20,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map.Entry;
@@ -199,10 +200,7 @@ public class Accumulo {
long sleep = 1000;
while (true) {
try {
- if (!(fs instanceof DistributedFileSystem))
- break;
- DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance());
- if (!dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET))
+ if (!isInSafeMode(fs))
break;
log.warn("Waiting for the NameNode to leave safemode");
} catch (IOException ex) {
@@ -214,4 +212,49 @@ public class Accumulo {
}
log.info("Connected to HDFS");
}
+
+ private static boolean isInSafeMode(FileSystem fs) throws IOException {
+ if (!(fs instanceof DistributedFileSystem))
+ return false;
+ DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(CachedConfiguration.getInstance());
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> constantClass;
+ try {
+ // hadoop 2.0
+ constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ constantClass = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Class<?> safeModeAction = null;
+ for (Class<?> klass : constantClass.getDeclaredClasses()) {
+ if (klass.getSimpleName().equals("SafeModeAction")) {
+ safeModeAction = klass;
+ break;
+ }
+ }
+ if (safeModeAction == null) {
+ throw new RuntimeException("Cannot find SafeModeAction in constants class");
+ }
+
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ return (Boolean)setSafeMode.invoke(dfs, get);
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
}