You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/10/10 20:35:32 UTC
svn commit: r1396722 [1/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/netty/
src/main/java/org/apache/giraph/comm/netty/handler/
src/main/java/org/apache/giraph/com...
Author: ekoontz
Date: Wed Oct 10 18:35:31 2012
New Revision: 1396722
URL: http://svn.apache.org/viewvc?rev=1396722&view=rev
Log:
GIRAPH-211: Add secure authentication to Netty IPC
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Modified:
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Oct 10 18:35:31 2012
@@ -534,8 +534,13 @@ under the License.
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <scope>provided</scope>
+ <version>3.1</version>
+ </dependency>
</dependencies>
-
<build>
<plugins>
<plugin>
@@ -553,7 +558,7 @@ under the License.
</executions>
<!-- profile: hadoop_0.20.203 -->
<configuration>
- <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+ <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY</symbols>
</configuration>
</plugin>
</plugins>
@@ -578,6 +583,12 @@ under the License.
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <scope>provided</scope>
+ <version>3.1</version>
+ </dependency>
</dependencies>
<build>
@@ -597,7 +608,7 @@ under the License.
</executions>
<!-- profile: hadoop_1.0 -->
<configuration>
- <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+ <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY</symbols>
</configuration>
</plugin>
</plugins>
@@ -647,6 +658,16 @@ under the License.
<configuration>
<!-- profile: hadoop_non_secure -->
<symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+ <excludes>**/comm/SecureRPCCommunications.java,
+ **/comm/netty/SaslNettyClient.java,
+ **/comm/netty/SaslNettyServer.java,
+ **/comm/netty/handler/AuthorizeServerHandler.java,
+ **/comm/netty/handler/SaslClientHandler.java,
+ **/comm/netty/handler/SaslServerHandler.java,
+ **/comm/requests/SaslCompleteRequest.java,
+ **/comm/requests/SaslTokenMessageRequest.java,
+ **/comm/SaslConnectionTest.java,
+ **/hadoop/BspTokenSelector.java</excludes>
</configuration>
</plugin>
<plugin>
@@ -654,10 +675,6 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
- <excludes>
- <exclude>**/BspTokenSelector.java</exclude>
- <exclude>**/SecureRPCCommunications.java</exclude>
- </excludes>
<source>${compileSource}</source>
<target>${compileSource}</target>
</configuration>
@@ -706,7 +723,10 @@ under the License.
<excludes>
<exclude>BspTokenSelector.java</exclude>
<exclude>SecureRPCCommunications.java</exclude>
- </excludes>
+ </excludes>
+ </resource>
+ <resource>
+ <directory>${basedir}/src/main/java/org/apache/giraph/comm/netty</directory>
</resource>
</resources>
<plugins>
@@ -739,6 +759,16 @@ under the License.
<!-- profile: hadoop_facebook -->
<configuration>
<symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+ <excludes>**/comm/SecureRPCCommunications.java,
+ **/comm/netty/SaslNettyClient.java,
+ **/comm/netty/SaslNettyServer.java,
+ **/comm/netty/handler/AuthorizeServerHandler.java,
+ **/comm/netty/handler/SaslClientHandler.java,
+ **/comm/netty/handler/SaslServerHandler.java,
+ **/comm/requests/SaslCompleteRequest.java,
+ **/comm/requests/SaslTokenMessageRequest.java,
+ **/comm/SaslConnectionTest.java,
+ **/hadoop/BspTokenSelector.java</excludes>
</configuration>
</plugin>
<plugin>
@@ -826,6 +856,69 @@ under the License.
</profile>
<profile>
+ <id>hadoop_2.0.1</id>
+ <activation>
+ <property>
+ <name>hadoop</name>
+ <value>2.0.1</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>2.0.1-alpha</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>hadoop_2.0.2</id>
+ <activation>
+ <property>
+ <name>hadoop</name>
+ <value>2.0.2</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>2.0.2-alpha</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>hadoop_trunk</id>
<activation>
<property>
Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Oct 10 18:35:31 2012
@@ -563,6 +563,14 @@ public class GiraphConfiguration extends
public static final String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
/**
+ * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
+ * and authorize Netty BSP Clients to Servers.
+ */
+ public static final String AUTHENTICATE = "giraph.authenticate";
+ /** Default is not to do authenticate and authorization with Netty. */
+ public static final boolean DEFAULT_AUTHENTICATE = false;
+
+ /**
* Constructor that creates the configuration
*/
public GiraphConfiguration() { }
@@ -841,4 +849,13 @@ public class GiraphConfiguration extends
}
return mapTasks;
}
+
+ /**
+ * Use authentication? (if supported)
+ *
+ * @return True if should authenticate, false otherwise
+ */
+ public boolean authenticate() {
+ return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Oct 10 18:35:31 2012
@@ -580,8 +580,18 @@ public abstract class BasicRPCCommunicat
return myAddress.getPort();
}
+ /**
+ * Connect all BSP workers with each other.
+ * @param authenticate ignored if using Hadoop RPC, since authentication is
+ * handled at the level of Hadoop RPC. Only when Netty is used is this
+ * significant.
+ */
@Override
+/*if[HADOOP_NON_SECURE]
public void setup() {
+else[HADOOP_NON_SECURE]*/
+ public void setup(boolean authenticate) {
+/*end[HADOOP_NON_SECURE]*/
try {
connectAllRPCProxys(this.jobId, this.jobToken);
} catch (IOException e) {
@@ -1297,6 +1307,15 @@ public abstract class BasicRPCCommunicat
}
}
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ @Override
+ public void authenticate() {
+ // nothing done here if using Hadoop RPC: authentication is
+ // handled at Hadoop library-level
+ }
+/*end[HADOOP_NON_SECURE]*/
+
@Override
public String getName() {
return myName;
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Wed Oct 10 18:35:31 2012
@@ -19,11 +19,6 @@
package org.apache.giraph.comm;
import org.apache.giraph.graph.Vertex;
-/*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.hadoop.BspTokenSelector;
-import org.apache.hadoop.security.token.TokenInfo;
-/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.VersionedProtocol;
@@ -40,10 +35,6 @@ import java.io.IOException;
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-/*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
-@TokenInfo(BspTokenSelector.class)
-/*end[HADOOP_NON_SECURE]*/
public interface CommunicationsInterface<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends VersionedProtocol {
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java Wed Oct 10 18:35:31 2012
@@ -46,7 +46,6 @@ import org.apache.giraph.bsp.Centralized
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.hadoop.BspPolicyProvider;
-
/*if[HADOOP_NON_INTERVERSIONED_RPC]
else[HADOOP_NON_INTERVERSIONED_RPC]*/
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -148,8 +147,7 @@ public class SecureRPCCommunications<I e
}
}
- // TODO: make munge tag more specific: just use HADOOP_1 maybe.
- /*if[HADOOP_1_AUTHORIZATION]
+/*if[HADOOP_1_SECURITY]
// Hadoop 1-style authorization.
Server server = RPC.getServer(this,
myAddress.getHostName(), myAddress.getPort(),
@@ -160,7 +158,7 @@ public class SecureRPCCommunications<I e
if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
}
- else[HADOOP_1_AUTHORIZATION]*/
+else[HADOOP_1_SECURITY]*/
// Hadoop 2+-style authorization.
Server server = RPC.getServer(this,
myAddress.getHostName(), myAddress.getPort(),
@@ -172,7 +170,7 @@ public class SecureRPCCommunications<I e
ServiceAuthorizationManager sam = new ServiceAuthorizationManager();
sam.refresh(conf, new BspPolicyProvider());
}
- /*end[HADOOP_1_AUTHORIZATION]*/
+/*end[HADOOP_1_SECURITY]*/
return server;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 10 18:35:31 2012
@@ -39,10 +39,21 @@ import java.io.IOException;
@SuppressWarnings("rawtypes")
public interface WorkerClient<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
+
/**
* Setup the client.
*/
+/*if[HADOOP_NON_SECURE]
void setup();
+else[HADOOP_NON_SECURE]*/
+ /**
+ * Setup the client.
+ *
+ * @param authenticate whether to SASL authenticate with server or not:
+ * set by giraph.authenticate configuration option.
+ */
+ void setup(boolean authenticate);
+/*end[HADOOP_NON_SECURE]*/
/**
* Fix changes to the workers and the mapping between partitions and
@@ -123,4 +134,14 @@ public interface WorkerClient<I extends
* @throws IOException
*/
void closeConnections() throws IOException;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /**
+ * Authenticates, as client, with another BSP worker, as server.
+ *
+ * @throws IOException
+ */
+ void authenticate() throws IOException;
+/*end[HADOOP_NON_SECURE]*/
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 10 18:35:31 2012
@@ -22,6 +22,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
@@ -38,6 +40,12 @@ import org.apache.giraph.comm.netty.hand
import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
import org.apache.giraph.comm.netty.handler.RequestEncoder;
import org.apache.giraph.comm.netty.handler.RequestInfo;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslClientHandler;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
@@ -47,13 +55,15 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
-
import static org.jboss.netty.channel.Channels.pipeline;
/**
@@ -74,6 +84,12 @@ public class NettyClient {
public static final int MAX_REQUESTS_TO_LIST = 10;
/** 30 seconds to connect by default */
public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /** Used to authenticate with other workers acting as servers */
+ public static final ChannelLocal<SaslNettyClient> SASL =
+ new ChannelLocal<SaslNettyClient>();
+/*end[HADOOP_NON_SECURE]*/
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Context used to report progress */
@@ -234,31 +250,62 @@ public class NettyClient {
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = pipeline();
- pipeline.addLast("clientByteCounter", byteCounter);
- pipeline.addLast("responseFrameDecoder",
- new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
- pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
- GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
- GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
- pipeline.addLast("responseClientHandler",
- new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
-
- if (executionHandler != null) {
- pipeline.addAfter(handlerBeforeExecutionHandler,
- "executionHandler", executionHandler);
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ if (conf.authenticate()) {
+ LOG.info("Using Netty with authentication.");
+
+ // Our pipeline starts with just byteCounter, and then we use
+ // addLast() to incrementally add pipeline elements, so that we can
+ // name them for identification for removal or replacement after
+ // client is authenticated by server.
+ // After authentication is complete, the pipeline's SASL-specific
+ // functionality is removed, restoring the pipeline to exactly the
+ // same configuration as it would be without authentication.
+ ChannelPipeline pipeline = Channels.pipeline(
+ byteCounter);
+ // The following pipeline component is needed to decode the server's
+ // SASL tokens. It is replaced with a FixedLengthFrameDecoder (same
+ // as used with the non-authenticated pipeline) after authentication
+ // completes (as in non-auth pipeline below).
+ pipeline.addLast("length-field-based-frame-decoder",
+ new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
+ pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt(
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+ // The following pipeline component responds to the server's SASL
+ // tokens with its own responses. Both client and server share the
+ // same Hadoop Job token, which is used to create the SASL tokens to
+ // authenticate with each other.
+ // After authentication finishes, this pipeline component is removed.
+ pipeline.addLast("sasl-client-handler",
+ new SaslClientHandler(conf));
+ pipeline.addLast("response-handler",
+ new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+ return pipeline;
+ } else {
+ LOG.info("Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("clientByteCounter", byteCounter);
+ pipeline.addLast("responseFrameDecoder",
+ new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+ pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+ pipeline.addLast("responseClientHandler",
+ new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+ if (executionHandler != null) {
+ pipeline.addAfter(handlerBeforeExecutionHandler,
+ "executionHandler", executionHandler);
+ }
+ return pipeline;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
}
- return pipeline;
+/*end[HADOOP_NON_SECURE]*/
}
});
-
- if (LOG.isInfoEnabled()) {
- LOG.info("NettyClient: Started client" +
- " with up to " + maxPoolSize + " threads" +
- " with sendBufferSize = " + sendBufferSize +
- " receiveBufferSize = " + receiveBufferSize +
- " maxRequestMilliseconds = " + maxRequestMilliseconds);
- }
}
/**
@@ -379,6 +426,86 @@ public class NettyClient {
}
}
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /**
+ * Authenticate all servers in addressChannelMap.
+ */
+ public void authenticate() {
+ LOG.info("authenticate: NettyClient starting authentication with " +
+ "servers.");
+ for (InetSocketAddress address: addressChannelMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("authenticate: Authenticating with address:" + address);
+ }
+ ChannelRotater channelRotater = addressChannelMap.get(address);
+ for (Channel channel: channelRotater.getChannels()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("authenticate: Authenticating with server on channel: " +
+ channel);
+ }
+ authenticateOnChannel(channelRotater.getTaskId(), channel);
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("authenticate: NettyClient successfully authenticated with " +
+ addressChannelMap.size() + " server" +
+ ((addressChannelMap.size() != 1) ? "s" : "") +
+ " - continuing with normal work.");
+ }
+ }
+
+ /**
+ * Authenticate with server connected at given channel.
+ *
+ * @param taskId Task id of the channel
+ * @param channel Connection to server to authenticate with.
+ */
+ private void authenticateOnChannel(Integer taskId, Channel channel) {
+ try {
+ SaslNettyClient saslNettyClient = SASL.get(channel);
+ if (SASL.get(channel) == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("authenticateOnChannel: Creating saslNettyClient now " +
+ "for channel: " + channel);
+ }
+ saslNettyClient = new SaslNettyClient();
+ SASL.set(channel, saslNettyClient);
+ }
+ if (!saslNettyClient.isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("authenticateOnChannel: Waiting for authentication " +
+ "to complete..");
+ }
+ SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
+ sendWritableRequest(taskId, (InetSocketAddress) channel
+ .getRemoteAddress(),
+ saslTokenMessage);
+ // We now wait for Netty's thread pool to communicate over this
+ // channel to authenticate with another worker acting as a server.
+ try {
+ synchronized (saslNettyClient.getAuthenticated()) {
+ while (!saslNettyClient.isComplete()) {
+ saslNettyClient.getAuthenticated().wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("authenticateOnChannel: Interrupted while waiting for " +
+ "authentication.");
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("authenticateOnChannel: Authentication on channel: " +
+ channel + " has completed successfully.");
+ }
+ } catch (IOException e) {
+ LOG.error("authenticateOnChannel: Failed to authenticate with server " +
+ "due to error: " + e);
+ }
+ return;
+ }
+/*end[HADOOP_NON_SECURE]*/
+
/**
* Stop the client.
*/
@@ -436,7 +563,7 @@ public class NettyClient {
// Get rid of the failed channel
addressChannelMap.get(remoteServer).removeLast();
if (LOG.isInfoEnabled()) {
- LOG.info("checkAndFixChannel: Fixing disconnected channel to " +
+ LOG.info("getNextChannel: Fixing disconnected channel to " +
remoteServer + ", open = " + channel.isOpen() + ", " +
"bound = " + channel.isBound());
}
@@ -446,24 +573,24 @@ public class NettyClient {
connectionFuture.awaitUninterruptibly();
if (connectionFuture.isSuccess()) {
if (LOG.isInfoEnabled()) {
- LOG.info("checkAndFixChannel: Connected to " + remoteServer + "!");
+ LOG.info("getNextChannel: Connected to " + remoteServer + "!");
}
addressChannelMap.get(remoteServer).addChannel(
connectionFuture.getChannel());
return connectionFuture.getChannel();
}
++reconnectFailures;
- LOG.warn("checkAndFixChannel: Failed to reconnect to " + remoteServer +
+ LOG.warn("getNextChannel: Failed to reconnect to " + remoteServer +
" on attempt " + reconnectFailures + " out of " +
maxConnectionFailures + " max attempts, sleeping for 5 secs",
connectionFuture.getCause());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
- LOG.warn("blockingConnect: Unexpected interrupted exception", e);
+ LOG.warn("getNextChannel: Unexpected interrupted exception", e);
}
}
- throw new IllegalStateException("checkAndFixChannel: Failed to connect " +
+ throw new IllegalStateException("getNextChannel: Failed to connect " +
"to " + remoteServer + " in " + reconnectFailures +
" connect attempts");
}
@@ -481,22 +608,30 @@ public class NettyClient {
if (clientRequestIdRequestInfoMap.isEmpty()) {
byteCounter.resetAll();
}
+ boolean registerRequest = true;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
+ registerRequest = false;
+ }
+/*end[HADOOP_NON_SECURE]*/
Channel channel = getNextChannel(remoteServer);
- request.setClientId(clientId);
- request.setRequestId(
- addressRequestIdGenerator.getNextRequestId(remoteServer));
-
RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
- RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
- new ClientRequestId(destWorkerId, request.getRequestId()),
- newRequestInfo);
- if (oldRequestInfo != null) {
- throw new IllegalStateException("sendWritableRequest: Impossible to " +
+ if (registerRequest) {
+ request.setClientId(clientId);
+ request.setRequestId(
+ addressRequestIdGenerator.getNextRequestId(remoteServer));
+ ClientRequestId clientRequestId =
+ new ClientRequestId(destWorkerId, request.getRequestId());
+ RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+ clientRequestId, newRequestInfo);
+ if (oldRequestInfo != null) {
+ throw new IllegalStateException("sendWritableRequest: Impossible to " +
"have a previous request id = " + request.getRequestId() + ", " +
"request info of " + oldRequestInfo);
+ }
}
-
ChannelFuture writeFuture = channel.write(request);
newRequestInfo.setWriteFuture(writeFuture);
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Oct 10 18:35:31 2012
@@ -24,18 +24,29 @@ import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
+/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
import org.apache.giraph.comm.netty.handler.RequestDecoder;
import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseEncoder;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+/*end[HADOOP_NON_SECURE]*/
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelLocal;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -46,12 +57,23 @@ import org.jboss.netty.handler.execution
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import static org.jboss.netty.channel.Channels.pipeline;
+
/**
* This server uses Netty and will implement all Giraph communication
*/
public class NettyServer {
/** Default maximum thread pool size */
public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
+
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /** Used to authenticate with netty clients */
+ public static final ChannelLocal<SaslNettyServer>
+ CHANNEL_SASL_NETTY_SERVERS =
+ new ChannelLocal<SaslNettyServer>();
+/*end[HADOOP_NON_SECURE]*/
+
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
@@ -70,6 +92,11 @@ public class NettyServer {
private final int tcpBacklog;
/** Factory for {@link RequestServerHandler} */
private final RequestServerHandler.Factory requestServerHandlerFactory;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /** Factory for {@link RequestServerHandler} */
+ private SaslServerHandler.Factory saslServerHandlerFactory;
+/*end[HADOOP_NON_SECURE]*/
/** Server bootstrap */
private ServerBootstrap bootstrap;
/** Byte counter for this client */
@@ -99,7 +126,10 @@ public class NettyServer {
RequestServerHandler.Factory requestServerHandlerFactory) {
this.conf = conf;
this.requestServerHandlerFactory = requestServerHandlerFactory;
-
+ /*if[HADOOP_NON_SECURE]
+ else[HADOOP_NON_SECURE]*/
+ this.saslServerHandlerFactory = new SaslServerHandler.Factory();
+ /*end[HADOOP_NON_SECURE]*/
sendBufferSize = conf.getInt(
GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
@@ -156,6 +186,23 @@ public class NettyServer {
}
}
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ /**
+ * Constructor for creating the server
+ *
+ * @param conf Configuration to use
+ * @param requestServerHandlerFactory Factory for request handlers
+ * @param saslServerHandlerFactory Factory for SASL handlers
+ */
+ public NettyServer(ImmutableClassesGiraphConfiguration conf,
+ RequestServerHandler.Factory requestServerHandlerFactory,
+ SaslServerHandler.Factory saslServerHandlerFactory) {
+ this(conf, requestServerHandlerFactory);
+ this.saslServerHandlerFactory = saslServerHandlerFactory;
+ }
+/*end[HADOOP_NON_SECURE]*/
+
/**
* Start the server with the appropriate port
*/
@@ -173,31 +220,62 @@ public class NettyServer {
receiveBufferSize,
receiveBufferSize));
+ /**
+ * Pipeline setup: depends on whether configured to use authentication
+ * or not.
+ */
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = pipeline();
-
- pipeline.addLast("serverByteCounter", byteCounter);
- pipeline.addLast("requestFrameDecoder",
- new LengthFieldBasedFrameDecoder(
- 1024 * 1024 * 1024, 0, 4, 0, 4));
- pipeline.addLast("requestDecoder",
- new RequestDecoder(conf, byteCounter));
- pipeline.addLast("requestProcessor",
- requestServerHandlerFactory.newHandler(
- workerRequestReservedMap, conf));
- if (executionHandler != null) {
- pipeline.addAfter(handlerBeforeExecutionHandler,
- "executionHandler", executionHandler);
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ if (conf.authenticate()) {
+ LOG.info("start: Will use Netty pipeline with " +
+ "authentication and authorization of clients.");
+ // After a client authenticates, the two authentication-specific
+ // pipeline components SaslServerHandler and ResponseEncoder are
+ // removed, leaving the pipeline the same as in the non-authenticated
+ // configuration except for the presence of the Authorize component.
+ return Channels.pipeline(
+ byteCounter,
+ new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
+ new RequestDecoder(conf, byteCounter),
+ // Removed after authentication completes:
+ saslServerHandlerFactory.newHandler(conf),
+ new AuthorizeServerHandler(),
+ requestServerHandlerFactory.newHandler(workerRequestReservedMap,
+ conf),
+ // Removed after authentication completes:
+ new ResponseEncoder());
+ } else {
+ LOG.info("start: Using Netty without authentication.");
+/*end[HADOOP_NON_SECURE]*/
+ ChannelPipeline pipeline = pipeline();
+
+ pipeline.addLast("serverByteCounter", byteCounter);
+ pipeline.addLast("requestFrameDecoder",
+ new LengthFieldBasedFrameDecoder(
+ 1024 * 1024 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("requestDecoder",
+ new RequestDecoder(conf, byteCounter));
+ pipeline.addLast("requestProcessor",
+ requestServerHandlerFactory.newHandler(
+ workerRequestReservedMap, conf));
+ if (executionHandler != null) {
+ pipeline.addAfter(handlerBeforeExecutionHandler,
+ "executionHandler", executionHandler);
+ }
+ return pipeline;
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
}
- return pipeline;
+/*end[HADOOP_NON_SECURE]*/
}
});
int taskId = conf.getTaskPartition();
int numTasks = conf.getInt("mapred.map.tasks", 1);
- // number of workers + 1 for master
+ // Number of workers + 1 for master
int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
@@ -220,7 +298,7 @@ public class NettyServer {
this.myAddress = new InetSocketAddress(localHostname, bindPort);
if (failFirstPortBindingAttempt && bindAttempts == 0) {
if (LOG.isInfoEnabled()) {
- LOG.info("NettyServer: Intentionally fail first " +
+ LOG.info("start: Intentionally fail first " +
"binding attempt as giraph.failFirstRpcPortBindAttempt " +
"is true, port " + bindPort);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 10 18:35:31 2012
@@ -69,7 +69,7 @@ public class NettyWorkerClient<I extends
/** Class logger */
private static final Logger LOG =
Logger.getLogger(NettyWorkerClient.class);
- /** signal for getInetSocketAddress() to use WorkerInfo's address */
+ /** Signal for getInetSocketAddress() to use WorkerInfo's address */
private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
/** Hadoop configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
@@ -470,8 +470,26 @@ public class NettyWorkerClient<I extends
nettyClient.stop();
}
+/*if[HADOOP_NON_SECURE]
@Override
public void setup() {
fixPartitionIdToSocketAddrMap();
}
+else[HADOOP_NON_SECURE]*/
+ @Override
+ public void setup(boolean authenticate) {
+ fixPartitionIdToSocketAddrMap();
+ if (authenticate) {
+ authenticate();
+ }
+ }
+/*end[HADOOP_NON_SECURE]*/
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ @Override
+ public void authenticate() {
+ nettyClient.authenticate();
+ }
+/*end[HADOOP_NON_SECURE]*/
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Wed Oct 10 18:35:31 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
import java.io.IOException;
@@ -46,6 +47,9 @@ import java.io.IOException;
public class NettyWorkerClientServer<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements WorkerClientServer<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(NettyWorkerClientServer.class);
/** Client that sends requests */
private final WorkerClient<I, V, E, M> client;
/** Server that processes requests */
@@ -65,8 +69,7 @@ public class NettyWorkerClientServer<I e
server = new NettyWorkerServer<I, V, E, M>(
configuration, service, context);
client = new NettyWorkerClient<I, V, E, M>(context,
- configuration, service,
- ((NettyWorkerServer<I, V, E, M>) server).getServerData());
+ configuration, service, server.getServerData());
}
@Override
@@ -123,9 +126,30 @@ public class NettyWorkerClientServer<I e
}
@Override
+/*if[HADOOP_NON_SECURE]
public void setup() {
client.fixPartitionIdToSocketAddrMap();
}
+else[HADOOP_NON_SECURE]*/
+ public void setup(boolean authenticateWithServer) {
+ client.fixPartitionIdToSocketAddrMap();
+ if (authenticateWithServer) {
+ try {
+ client.authenticate();
+ } catch (IOException e) {
+ LOG.error("setup: Failed to authenticate : " + e);
+ }
+ }
+ }
+/*end[HADOOP_NON_SECURE]*/
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ @Override
+ public void authenticate() throws IOException {
+ client.authenticate();
+ }
+/*end[HADOOP_NON_SECURE]*/
@Override
public void prepareSuperstep() {
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyClient.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+
+/**
+ * Implements SASL logic for Giraph BSP worker clients.
+ */
+public class SaslNettyClient {
+ /** Class logger */
+ public static final Logger LOG = Logger.getLogger(SaslNettyClient.class);
+
+ /**
+ * Used to synchronize client requests: client's work-related requests must
+ * wait until SASL authentication completes.
+ */
+ private Object authenticated = new Object();
+
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
+
+ /**
+ * Create a SaslNettyClient for authentication with BSP servers.
+ */
+ public SaslNettyClient() {
+ try {
+ Token<? extends TokenIdentifier> token =
+ createJobToken(new Configuration());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SaslNettyClient: Creating SASL " +
+ AuthMethod.DIGEST.getMechanismName() +
+ " client to authenticate to service at " + token.getService());
+ }
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+ .getMechanismName() }, null, null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+ } catch (IOException e) {
+ LOG.error("SaslNettyClient: Could not obtain job token for Netty " +
+ "Client to use to authenticate with a Netty Server.");
+ saslClient = null;
+ }
+ }
+
+ public Object getAuthenticated() {
+ return authenticated;
+ }
+
+ /**
+ * Obtain JobToken, which we'll use as a credential for SASL authentication
+ * when connecting to other Giraph BSPWorkers.
+ *
+ * @param conf Configuration
+ * @return a JobToken containing username and password so that client can
+ * authenticate with a server.
+ */
+ private Token<JobTokenIdentifier> createJobToken(Configuration conf)
+ throws IOException {
+ String localJobTokenFile = System.getenv().get(
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (localJobTokenFile != null) {
+ JobConf jobConf = new JobConf(conf);
+ Credentials credentials =
+ TokenCache.loadTokens(localJobTokenFile, jobConf);
+ return TokenCache.getJobToken(credentials);
+ } else {
+ throw new IOException("createJobToken: Cannot obtain authentication " +
+ "credentials for job: file: '" +
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION + "' not found");
+ }
+ }
+
+ /**
+ * Used by authenticateOnChannel() to initiate SASL handshake with server.
+ * @return SaslTokenMessageRequest message to be sent to server.
+ * @throws IOException
+ */
+ public SaslTokenMessageRequest firstToken()
+ throws IOException {
+ byte[] saslToken = new byte[0];
+ if (saslClient.hasInitialResponse()) {
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ }
+ SaslTokenMessageRequest saslTokenMessage =
+ new SaslTokenMessageRequest();
+ saslTokenMessage.setSaslToken(saslToken);
+ return saslTokenMessage;
+ }
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+ /**
+ * Respond to server's SASL token.
+ * @param saslTokenMessage contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslTokenMessageRequest saslTokenMessage) {
+ try {
+ byte[] retval =
+ saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("saslResponse: Failed to respond to SASL server's token:", e);
+ return null;
+ }
+ }
+
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler
+ * that works with Hadoop JobTokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ /** Generated username contained in JobToken */
+ private final String userName;
+ /** Generated password contained in JobToken */
+ private final char[] userPassword;
+
+ /**
+ * Set private members using token.
+ * @param token Hadoop JobToken.
+ */
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = SaslNettyServer.encodeIdentifier(token.getIdentifier());
+ this.userPassword = SaslNettyServer.encodePassword(token.getPassword());
+ }
+
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting username: " +
+ userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL client callback: setting realm: " +
+ rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.netty;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.classification.InterfaceStability;
+/*if[HADOOP_1_SECURITY]
+else[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.ipc.StandbyException;
+/*end[HADOOP_1_SECURITY]*/
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+
+/**
+ * Encapsulates SASL server logic for Giraph BSP worker servers.
+ */
+public class SaslNettyServer extends SaslRpcServer {
+ /** Logger */
+ public static final Logger LOG = Logger.getLogger(SaslNettyServer.class);
+
+ /**
+ * Actual SASL work done by this object from javax.security.sasl.
+ * Initialized below in constructor.
+ */
+ private SaslServer saslServer;
+
+ /**
+ * Constructor
+ *
+ * @param secretManager supplied by SaslServerHandler.
+ */
+ public SaslNettyServer(JobTokenSecretManager secretManager) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SaslNettyServer: Secret manager is: " + secretManager);
+ }
+/*if[HADOOP_1_SECURITY]
+else[HADOOP_1_SECURITY]*/
+ try {
+ secretManager.checkAvailableForRead();
+ } catch (StandbyException e) {
+ LOG.error("SaslNettyServer: Could not read secret manager: " + e);
+ }
+/*end[HADOOP_1_SECURITY]*/
+
+ try {
+ SaslDigestCallbackHandler ch =
+ new SaslNettyServer.SaslDigestCallbackHandler(secretManager);
+ saslServer = Sasl.createSaslServer(SaslNettyServer.AuthMethod.DIGEST
+ .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, ch);
+ } catch (SaslException e) {
+ LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
+ }
+ }
+
+ public boolean isComplete() {
+ return saslServer.isComplete();
+ }
+
+ public String getUserName() {
+ return saslServer.getAuthorizationID();
+ }
+
+ /**
+ * Used by SaslTokenMessage::processToken() to respond to server SASL tokens.
+ *
+ * @param token Server's SASL token
+ * @return token to send back to the server.
+ */
+ public byte[] response(byte[] token) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("response: Responding to input token of length: " +
+ token.length);
+ }
+ byte[] retval = saslServer.evaluateResponse(token);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("response: Response token length: " + retval.length);
+ }
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("response: Failed to evaluate client token of length: " +
+ token.length + " : " + e);
+ return null;
+ }
+ }
+
+ /**
+ * Encode a byte[] identifier as a Base64-encoded string.
+ *
+ * @param identifier identifier to encode
+ * @return Base64-encoded string
+ */
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ /**
+ * Encode a password as a base64-encoded char[] array.
+ * @param password as a byte array.
+ * @return password as a char array.
+ */
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ @InterfaceStability.Evolving
+ public static class SaslDigestCallbackHandler implements CallbackHandler {
+ /** Used to authenticate the clients */
+ private JobTokenSecretManager secretManager;
+
+ /**
+ * Constructor
+ *
+ * @param secretManager used to authenticate clients
+ */
+ public SaslDigestCallbackHandler(
+ JobTokenSecretManager secretManager) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler " +
+ "with secret manager: " + secretManager);
+ }
+ this.secretManager = secretManager;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws IOException,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "handle: Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ JobTokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
+ secretManager);
+ char[] password =
+ encodePassword(secretManager.retrievePassword(tokenIdentifier));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+ "password for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ getIdentifier(authzid, secretManager).getUser().getUserName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handle: SASL server DIGEST-MD5 callback: setting " +
+ "canonicalized client ID: " + username);
+ }
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/AuthorizeServerHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+/**
+ * Authorize or deny client requests based on existence and completeness
+ * of client's SASL authentication.
+ */
+public class AuthorizeServerHandler extends
+ SimpleChannelUpstreamHandler {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(AuthorizeServerHandler.class);
+
+ /**
+ * Constructor.
+ */
+ public AuthorizeServerHandler() {
+ }
+
+ @Override
+ public void messageReceived(
+ ChannelHandlerContext ctx, MessageEvent e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+ }
+ // Authorize: client is allowed to doRequest() if and only if the client
+ // has successfully authenticated with this server.
+ SaslNettyServer saslNettyServer =
+ NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+ if (saslNettyServer == null) {
+ LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+ "this action since there's no saslNettyServer to " +
+ "authenticate the client: " +
+ "refusing to perform requested action: " + e.getMessage());
+ return;
+ }
+
+ if (!saslNettyServer.isComplete()) {
+ LOG.warn("messageReceived: This client is *NOT* authorized to perform " +
+ "this action because SASL authentication did not complete: " +
+ "refusing to perform requested action: " + e.getMessage());
+ // Return now *WITHOUT* sending upstream here, since client
+ // not authorized.
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("messageReceived: authenticated client: " +
+ saslNettyServer.getUserName() + " is authorized to do request " +
+ "on server.");
+ }
+ // We call sendUpstream() since the client is allowed to perform this
+ // request. The client's request will now proceed to the next
+ // pipeline component, namely, RequestServerHandler.
+ ctx.sendUpstream(e);
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Oct 10 18:35:31 2012
@@ -81,7 +81,7 @@ public class RequestEncoder extends OneT
if (LOG.isDebugEnabled()) {
LOG.debug("encode: Client " + writableRequest.getClientId() + ", " +
"requestId " + writableRequest.getRequestId() +
- ", size = " + encodedBuffer.writerIndex() +
+ ", size = " + encodedBuffer.writerIndex() + ", " +
writableRequest.getType() + " took " +
SystemTime.getInstance().getNanosecondsSince(
startEncodingNanoseconds) + " ns");
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+/**
+ * How a server should respond to a client. Currently only used for
+ * responding to client's SASL messages, and removed after client
+ * authenticates.
+ */
+public class ResponseEncoder extends OneToOneEncoder {
+ /** Class logger. */
+ private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
+ /** Holds the place of the message length until known. */
+ private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx,
+ Channel channel, Object msg) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("encode(" + ctx + "," + channel + "," + msg);
+ }
+
+ if (!(msg instanceof WritableRequest)) {
+ throw new IllegalArgumentException(
+ "encode: cannot encode message of type " + msg.getClass() +
+ " since it is not an instance of an implementation of " +
+ " WritableRequest.");
+ }
+ @SuppressWarnings("unchecked")
+ WritableRequest writableRequest =
+ (WritableRequest) msg;
+ ChannelBufferOutputStream outputStream =
+ new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
+ 10, ctx.getChannel().getConfig().getBufferFactory()));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("encode: Encoding a message of type " + msg.getClass());
+ }
+ outputStream.write(LENGTH_PLACEHOLDER);
+
+ // write type of object.
+ outputStream.writeByte(writableRequest.getType().ordinal());
+
+ // write the object itself.
+ writableRequest.write(outputStream);
+
+ outputStream.flush();
+
+ // Set the correct size at the end.
+ ChannelBuffer encodedBuffer = outputStream.buffer();
+ encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("encode: Encoding a message of type " + msg.getClass());
+ }
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]*/
+ if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
+ // We are sending to the client a SASL_COMPLETE response (created by
+ // the SaslServer handler). The SaslServer handler has removed itself
+ // from the pipeline after creating this response, and now it's time for
+ // the ResponseEncoder to remove itself also.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("encode: Removing RequestEncoder handler: no longer needed," +
+ " since client: " + ctx.getChannel().getRemoteAddress() + " has " +
+ "completed authenticating.");
+ }
+ ctx.getPipeline().remove(this);
+ }
+/*end[HADOOP_NON_SECURE]*/
+ return encodedBuffer;
+ }
+}
+
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.SaslNettyClient;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslCompleteRequest;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+
+import java.io.IOException;
+
+/**
+ * Client-side Netty pipeline component that allows authentication with a
+ * server.
+ */
+public class SaslClientHandler extends OneToOneDecoder {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(SaslClientHandler.class);
+ /** Configuration */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration
+ */
+ public SaslClientHandler(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void handleUpstream(
+ ChannelHandlerContext ctx, ChannelEvent evt)
+ throws Exception {
+ if (!(evt instanceof MessageEvent)) {
+ ctx.sendUpstream(evt);
+ return;
+ }
+ MessageEvent e = (MessageEvent) evt;
+ Object originalMessage = e.getMessage();
+ Object decodedMessage = decode(ctx, ctx.getChannel(), originalMessage);
+ // Generate SASL response to server using Channel-local SASL client.
+ SaslNettyClient saslNettyClient = NettyClient.SASL.get(ctx.getChannel());
+ if (saslNettyClient == null) {
+ throw new Exception("handleUpstream: saslNettyClient was unexpectedly " +
+ "null for channel: " + ctx.getChannel());
+ }
+ if (decodedMessage.getClass() == SaslCompleteRequest.class) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handleUpstream: Server has sent us the SaslComplete " +
+ "message. Allowing normal work to proceed.");
+ }
+ synchronized (saslNettyClient.getAuthenticated()) {
+ saslNettyClient.getAuthenticated().notify();
+ }
+ if (!saslNettyClient.isComplete()) {
+ LOG.error("handleUpstream: Server returned a Sasl-complete message, " +
+ "but as far as we can tell, we are not authenticated yet.");
+ throw new Exception("handleUpstream: Server returned a " +
+ "Sasl-complete message, but as far as " +
+ "we can tell, we are not authenticated yet.");
+ }
+ // Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder
+ // from client pipeline.
+ ctx.getPipeline().remove(this);
+ ctx.getPipeline().replace("length-field-based-frame-decoder",
+ "fixed-length-frame-decoder",
+ new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+ return;
+ }
+ SaslTokenMessageRequest serverToken =
+ (SaslTokenMessageRequest) decodedMessage;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handleUpstream: Responding to server's token of length: " +
+ serverToken.getSaslToken().length);
+ }
+ // Generate SASL response (but we only actually send the response if it's
+ // non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(serverToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed (if
+ // not, warn), and return without sending a response back to the server.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handleUpstream: Response to server is null: " +
+ "authentication should now be complete.");
+ }
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("handleUpstream: Generated a null response, " +
+ "but authentication is not complete.");
+ }
+ return;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handleUpstream: Response to server token has length:" +
+ responseToServer.length);
+ }
+ }
+ // Construct a message containing the SASL response and send it to the
+ // server.
+ SaslTokenMessageRequest saslResponse =
+ new SaslTokenMessageRequest(responseToServer);
+ ctx.getChannel().write(saslResponse);
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx,
+ Channel channel, Object msg) throws Exception {
+ if (!(msg instanceof ChannelBuffer)) {
+ throw new IllegalStateException("decode: Got illegal message " + msg);
+ }
+ // Decode msg into an object whose class C implements WritableRequest:
+ // C will be either SaslTokenMessage or SaslComplete.
+ //
+ // 1. Convert message to a stream that can be decoded.
+ ChannelBuffer buffer = (ChannelBuffer) msg;
+ ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
+ // 2. Get first byte: message type:
+ int enumValue = inputStream.readByte();
+ RequestType type = RequestType.values()[enumValue];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("decode: Got a response of type " + type + " from server:" +
+ channel.getRemoteAddress());
+ }
+ // 3. Create object of the type determined in step 2.
+ Class<? extends WritableRequest> writableRequestClass =
+ type.getRequestClass();
+ WritableRequest serverResponse =
+ ReflectionUtils.newInstance(writableRequestClass, conf);
+ // 4. Deserialize the inputStream's contents into the newly-constructed
+ // serverResponse object.
+ try {
+ serverResponse.readFields(inputStream);
+ } catch (IOException e) {
+ LOG.error("decode: Exception when trying to read server response: " + e);
+ }
+ // serverResponse can now be used in the next stage in pipeline.
+ return serverResponse;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.giraph.comm.requests.RequestType;
+import org.apache.giraph.comm.requests.SaslCompleteRequest;
+import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Generate SASL response tokens to client SASL tokens, allowing clients to
+ * authenticate themselves with this server.
+ */
+public class SaslServerHandler extends
+ SimpleChannelUpstreamHandler {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SaslServerHandler.class);
+
+ // TODO: Move out into a separate, dedicated handler: ("FirstRequestHandler")
+ // or similar.
+ /** Already closed first request? */
+ private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+
+ /** Close connection on first request (used for simulating failure) */
+ private final boolean closeFirstRequest;
+ /** Used to store Hadoop Job Tokens to authenticate clients. */
+ private JobTokenSecretManager secretManager;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public SaslServerHandler(
+ Configuration conf) throws IOException {
+ SaslNettyServer.init(conf);
+ setupSecretManager(conf);
+ closeFirstRequest = conf.getBoolean(
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ }
+
+ @Override
+ public void messageReceived(
+ ChannelHandlerContext ctx, MessageEvent e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+ }
+
+ WritableRequest writableRequest = (WritableRequest) e.getMessage();
+ // Simulate a closed connection on the first request (if desired)
+ // TODO: Move out into a separate, dedicated handler.
+ if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
+ LOG.info("messageReceived: Simulating closing channel on first " +
+ "request " + writableRequest.getRequestId() + " from " +
+ writableRequest.getClientId());
+ ALREADY_CLOSED_FIRST_REQUEST = true;
+ ctx.getChannel().close();
+ return;
+ }
+
+ if (writableRequest.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ SaslNettyServer saslNettyServer =
+ NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+ if (saslNettyServer == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No saslNettyServer for " + ctx.getChannel() +
+ " yet; creating now, with secret manager: " + secretManager);
+ }
+ saslNettyServer = new SaslNettyServer(secretManager);
+ NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(),
+ saslNettyServer);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found existing saslNettyServer on server:" +
+ ctx.getChannel().getLocalAddress() + " for client " +
+ ctx.getChannel().getRemoteAddress());
+ }
+ }
+
+ ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
+ // Send response to client.
+ ctx.getChannel().write(writableRequest);
+ if (saslNettyServer.isComplete()) {
+ // If authentication of client is complete, we will also send a
+ // SASL-Complete message to the client.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL authentication is complete for client with " +
+ "username: " + saslNettyServer.getUserName());
+ }
+ SaslCompleteRequest saslComplete = new SaslCompleteRequest();
+ ctx.getChannel().write(saslComplete);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
+ "authentication is complete.");
+ }
+ ctx.getPipeline().remove(this);
+ }
+ // do not send upstream to other handlers: no further action needs to be
+ // done for SASL_TOKEN_MESSAGE_REQUEST requests.
+ return;
+ } else {
+ // Client should not be sending other-than-SASL messages before
+ // SaslServerHandler has removed itself from the pipeline. Such non-SASL
+ // requests will be denied by the Authorize channel handler (the next
+ // handler upstream in the server pipeline) if SASL authentication has
+ // not completed.
+ LOG.warn("Sending upstream an unexpected non-SASL message : " +
+ writableRequest);
+ ctx.sendUpstream(e);
+ }
+ }
+
+ /**
+ * Load Hadoop Job Token into secret manager.
+ *
+ * @param conf Configuration
+ * @throws IOException
+ */
+ private void setupSecretManager(Configuration conf) throws IOException {
+ secretManager = new JobTokenSecretManager();
+ String localJobTokenFile = System.getenv().get(
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (localJobTokenFile == null) {
+ throw new IOException("Could not find job credentials: environment " +
+ "variable: " + UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION +
+ " was not defined.");
+ }
+ JobConf jobConf = new JobConf(conf);
+
+ // Find the JobTokenIdentifiers among all the tokens available in the
+ // jobTokenFile and store them in the secretManager.
+ Credentials credentials =
+ TokenCache.loadTokens(localJobTokenFile, jobConf);
+ Collection<Token<? extends TokenIdentifier>> collection =
+ credentials.getAllTokens();
+ for (Token<? extends TokenIdentifier> token: collection) {
+ TokenIdentifier tokenIdentifier = decodeIdentifier(token,
+ JobTokenIdentifier.class);
+ if (tokenIdentifier instanceof JobTokenIdentifier) {
+ Token<JobTokenIdentifier> theToken =
+ (Token<JobTokenIdentifier>) token;
+ JobTokenIdentifier jobTokenIdentifier =
+ (JobTokenIdentifier) tokenIdentifier;
+ secretManager.addTokenForJob(
+ jobTokenIdentifier.getJobId().toString(), theToken);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loaded JobToken credentials: " + credentials + " from " +
+ "localJobTokenFile: " + localJobTokenFile);
+ }
+ }
+
+ /**
+ * Get the token identifier object, or null if it could not be constructed
+ * (because the class could not be loaded, for example).
+ * Hadoop 2.0.0 (and older Hadoop2 versions? (verify)) need this.
+ * Hadoop 2.0.1 and newer have a Token.decodeIdentifier() method and do not
+ * need this. Might want to create a munge flag to distinguish 2.0.0 vs newer.
+ *
+ * @param token the token to decode into a TokenIdentifier
+ * @param cls the subclass of TokenIdentifier to decode the token into.
+ * @return the token identifier.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private TokenIdentifier decodeIdentifier(
+ Token<? extends TokenIdentifier> token,
+ Class<? extends TokenIdentifier> cls) throws IOException {
+ TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ tokenIdentifier.readFields(in);
+ in.close();
+ return tokenIdentifier;
+ }
+
+ /** Factory for {@link SaslServerHandler} */
+ public static class Factory {
+ /**
+ * Constructor
+ */
+ public Factory() {
+ }
+ /**
+ * Create new {@link SaslServerHandler}
+ *
+ * @param conf Configuration to use
+ * @return New {@link SaslServerHandler}
+ */
+ public SaslServerHandler newHandler(
+ Configuration conf) throws IOException {
+ return new SaslServerHandler(conf);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java Wed Oct 10 18:35:31 2012
@@ -22,6 +22,16 @@ package org.apache.giraph.comm.requests;
* Type of the request
*/
public enum RequestType {
+ /*if[HADOOP_NON_SECURE]
+ else[HADOOP_NON_SECURE]*/
+ /** Exchange authentication information between clients and servers */
+ SASL_TOKEN_MESSAGE_REQUEST(SaslTokenMessageRequest.class),
+ /**
+ * Used by servers to acknowledge SASL authentication completion with
+ * client, so client can modify its pipeline afterwards.
+ */
+ SASL_COMPLETE_REQUEST(SaslCompleteRequest.class),
+ /*end[HADOOP_NON_SECURE]*/
/** Sending vertices request */
SEND_VERTEX_REQUEST(SendVertexRequest.class),
/** Sending a partition of messages for next superstep */
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslCompleteRequest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Reply from server to client after SASL authentication completes.
+ */
+public class SaslCompleteRequest extends WritableRequest {
+ /**
+ * Constructor used for reflection and sending.
+ */
+ public SaslCompleteRequest() {
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SASL_COMPLETE_REQUEST;
+ }
+
+ @Override
+ public void readFieldsRequest(DataInput input) throws IOException {
+ }
+
+ @Override
+ public void writeRequest(DataOutput output) throws IOException {
+ }
+}