You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ek...@apache.org on 2012/10/10 20:35:32 UTC
svn commit: r1396722 [2/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/netty/
src/main/java/org/apache/giraph/comm/netty/handler/
src/main/java/org/apache/giraph/com...
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.netty.SaslNettyServer;
+import org.apache.log4j.Logger;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Send and receive SASL tokens.
+ */
+public class SaslTokenMessageRequest extends WritableRequest {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SaslTokenMessageRequest.class);
+
+ /** Used for client or server's token to send or receive from each other. */
+ private byte[] token;
+
+ /**
+ * Constructor used for reflection only.
+ */
+ public SaslTokenMessageRequest() { }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param token the SASL token, generated by a SaslClient or SaslServer.
+ */
+ public SaslTokenMessageRequest(byte[] token) {
+ this.token = token;
+ }
+
+ /**
+ * Read accessor for SASL token
+ *
+ * @return saslToken SASL token
+ */
+ public byte[] getSaslToken() {
+ return token;
+ }
+
+ /**
+ * Write accessor for SASL token
+ *
+ * @param token SASL token
+ */
+ public void setSaslToken(byte[] token) {
+ this.token = token;
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SASL_TOKEN_MESSAGE_REQUEST;
+ }
+
+ @Override
+ public void readFieldsRequest(DataInput input) throws IOException {
+ int tokenSize = input.readInt();
+ token = new byte[tokenSize];
+ input.readFully(token);
+ }
+
+ /**
+ * Update server's token in response to the SASL token received from
+ * client. Updated token is sent to client by
+ * SaslServerHandler.messageReceived().
+ *
+ * @param saslNettyServer used to create response.
+ */
+
+ public void processToken(SaslNettyServer saslNettyServer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processToken: With nettyServer: " + saslNettyServer +
+ " and token length: " + token.length);
+ }
+ token = saslNettyServer.response(token);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processToken: Response token's length is:" + token.length);
+ }
+ }
+
+ @Override
+ public void writeRequest(DataOutput output) throws IOException {
+ output.writeInt(token.length);
+ output.write(token);
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1396722&r1=1396721&r2=1396722&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 10 18:35:31 2012
@@ -594,7 +594,11 @@ else[HADOOP_NON_SECURE]*/
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+/*if[HADOOP_NON_SECURE]
commService.setup();
+else[HADOOP_NON_SECURE]*/
+ commService.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
// Ensure the InputSplits are ready for processing before processing
while (true) {
@@ -1037,11 +1041,11 @@ else[HADOOP_NON_SECURE]*/
}
} catch (KeeperException e) {
// Cleaning up, it's okay to fail after cleanup is successful
- LOG.error("cleanup: Got KeeperException on notifcation " +
+ LOG.error("cleanup: Got KeeperException on notification " +
"to master about cleanup", e);
} catch (InterruptedException e) {
// Cleaning up, it's okay to fail after cleanup is successful
- LOG.error("cleanup: Got InterruptedException on notifcation " +
+ LOG.error("cleanup: Got InterruptedException on notification " +
"to master about cleanup", e);
}
try {
@@ -1239,7 +1243,7 @@ else[HADOOP_NON_SECURE]*/
++loadedPartitions;
} catch (IOException e) {
throw new RuntimeException(
- "loadCheckpoing: Failed to get partition owner " +
+ "loadCheckpoint: Failed to get partition owner " +
partitionOwner, e);
}
}
@@ -1269,7 +1273,11 @@ else[HADOOP_NON_SECURE]*/
// Communication service needs to setup the connections prior to
// processing vertices
+/*if[HADOOP_NON_SECURE]
commService.setup();
+else[HADOOP_NON_SECURE]*/
+ commService.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
}
/**
Added: giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1396722&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Wed Oct 10 18:35:31 2012
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Maps;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+/**
+ * Netty connection with mocked authentication.
+ */
+public class SaslConnectionTest {
+ /** Class configuration */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ public static class IntVertex extends EdgeListVertex<IntWritable,
+ IntWritable, IntWritable, IntWritable> {
+ @Override
+ public void compute(Iterable<IntWritable> messages) throws IOException {
+ }
+ }
+
+ @Before
+ public void setUp() {
+ GiraphConfiguration tmpConfig = new GiraphConfiguration();
+ tmpConfig.setVertexClass(IntVertex.class);
+ tmpConfig.setBoolean(GiraphConfiguration.AUTHENTICATE, true);
+ conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
+ }
+
+ /**
+ * Test connecting a single client to a single server.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void connectSingleClientServer() throws IOException {
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(conf);
+
+ ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
+
+ SaslServerHandler.Factory mockedSaslServerFactory =
+ Mockito.mock(SaslServerHandler.Factory.class);
+
+ SaslServerHandler mockedSaslServerHandler =
+ Mockito.mock(SaslServerHandler.class);
+ when(mockedSaslServerFactory.newHandler(conf)).
+ thenReturn(mockedSaslServerHandler);
+
+ NettyServer server =
+ new NettyServer(conf,
+ new WorkerRequestServerHandler.Factory(serverData),
+ mockedSaslServerFactory);
+ server.start();
+
+ NettyClient client = new NettyClient(context, conf);
+ Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
+ addressIdMap.put(server.getMyAddress(), -1);
+ client.connectAllAddresses(addressIdMap);
+
+ client.stop();
+ server.stop();
+ }
+}