You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/10/10 20:35:32 UTC

svn commit: r1396722 [2/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/netty/ src/main/java/org/apache/giraph/comm/netty/handler/ src/main/java/org/apache/giraph/com...

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslTokenMessageRequest extends WritableRequest {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SaslTokenMessageRequest.class);
+
+  /** Used for client or server's token to send or receive from each other. */
+  private byte[] token;
+
+  /**
+   * Constructor used for reflection only.
+   */
+  public SaslTokenMessageRequest() { }
+
+ /**
+   * Constructor used to send request.
+   *
+   * @param token the SASL token, generated by a SaslClient or SaslServer.
+   */
+  public SaslTokenMessageRequest(byte[] token) {
+    this.token = token;
+  }
+
+  /**
+   * Read accessor for SASL token
+   *
+   * @return saslToken SASL token
+   */
+  public byte[] getSaslToken() {
+    return token;
+  }
+
+  /**
+   * Write accessor for SASL token
+   *
+   * @param token SASL token
+   */
+  public void setSaslToken(byte[] token) {
+    this.token = token;
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SASL_TOKEN_MESSAGE_REQUEST;
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    int tokenSize = input.readInt();
+    token = new byte[tokenSize];
+    input.readFully(token);
+  }
+
+  /**
+   * Update server's token in response to the SASL token received from
+   * client. Updated token is sent to client by
+   * SaslServerHandler.messageReceived().
+   *
+   * @param saslNettyServer used to create response.
+   */
+
+  public void processToken(SaslNettyServer saslNettyServer) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("processToken:  With nettyServer: " + saslNettyServer +
+          " and token length: " + token.length);
+    }
+    token = saslNettyServer.response(token);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("processToken: Response token's length is:" + token.length);
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(token.length);
+    output.write(token);
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 10 18:35:31 2012
@@ -594,7 +594,11 @@ else[HADOOP_NON_SECURE]*/
     workerGraphPartitioner.updatePartitionOwners(
         getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 
+/*if[HADOOP_NON_SECURE]
     commService.setup();
+else[HADOOP_NON_SECURE]*/
+    commService.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
 
     // Ensure the InputSplits are ready for processing before processing
     while (true) {
@@ -1037,11 +1041,11 @@ else[HADOOP_NON_SECURE]*/
       }
     } catch (KeeperException e) {
       // Cleaning up, it's okay to fail after cleanup is successful
-      LOG.error("cleanup: Got KeeperException on notifcation " +
+      LOG.error("cleanup: Got KeeperException on notification " +
           "to master about cleanup", e);
     } catch (InterruptedException e) {
       // Cleaning up, it's okay to fail after cleanup is successful
-      LOG.error("cleanup: Got InterruptedException on notifcation " +
+      LOG.error("cleanup: Got InterruptedException on notification " +
           "to master about cleanup", e);
     }
     try {
@@ -1239,7 +1243,7 @@ else[HADOOP_NON_SECURE]*/
           ++loadedPartitions;
         } catch (IOException e) {
           throw new RuntimeException(
-              "loadCheckpoing: Failed to get partition owner " +
+              "loadCheckpoint: Failed to get partition owner " +
                   partitionOwner, e);
         }
       }
@@ -1269,7 +1273,11 @@ else[HADOOP_NON_SECURE]*/
 
     // Communication service needs to setup the connections prior to
     // processing vertices
+/*if[HADOOP_NON_SECURE]
     commService.setup();
+else[HADOOP_NON_SECURE]*/
+    commService.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
   }
 
   /**

Added: giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+/**
+ * Netty connection with mocked authentication.
+ */
+public class SaslConnectionTest {
+  /** Class configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  public static class IntVertex extends EdgeListVertex<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() {
+    GiraphConfiguration tmpConfig = new GiraphConfiguration();
+    tmpConfig.setVertexClass(IntVertex.class);
+    tmpConfig.setBoolean(GiraphConfiguration.AUTHENTICATE, true);
+    conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
+  }
+
+  /**
+   * Test connecting a single client to a single server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectSingleClientServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf,
+            SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+            context);
+
+    SaslServerHandler.Factory mockedSaslServerFactory =
+        Mockito.mock(SaslServerHandler.Factory.class);
+
+    SaslServerHandler mockedSaslServerHandler =
+        Mockito.mock(SaslServerHandler.class);
+    when(mockedSaslServerFactory.newHandler(conf)).
+        thenReturn(mockedSaslServerHandler);
+
+    NettyServer server =
+        new NettyServer(conf,
+            new WorkerRequestServerHandler.Factory(serverData),
+            mockedSaslServerFactory);
+    server.start();
+
+    NettyClient client = new NettyClient(context, conf);
+    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+    addressIdMap.put(server.getMyAddress(), -1);
+    client.connectAllAddresses(addressIdMap);
+
+    client.stop();
+    server.stop();
+  }
+}