You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/04 01:32:00 UTC
svn commit: r1393814 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: aching
Date: Wed Oct 3 23:32:00 2012
New Revision: 1393814
URL: http://svn.apache.org/viewvc?rev=1393814&view=rev
Log:
GIRAPH-212: Security is busted since GIRAPH-168
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
- copied, changed from r1393275, giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 3 23:32:00 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-212: Security is busted since GIRAPH-168. (ekoontz via
+ aching)
+
GIRAPH-315: giraph-site.xml isn't read on time. (majakabiljo via
aching)
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Oct 3 23:32:00 2012
@@ -551,8 +551,9 @@ under the License.
</goals>
</execution>
</executions>
+ <!-- profile: hadoop_0.20.203 -->
<configuration>
- <symbols>HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+ <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
</configuration>
</plugin>
</plugins>
@@ -594,8 +595,9 @@ under the License.
</goals>
</execution>
</executions>
+ <!-- profile: hadoop_1.0 -->
<configuration>
- <symbols>HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+ <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
</configuration>
</plugin>
</plugins>
@@ -620,6 +622,12 @@ under the License.
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <scope>provided</scope>
+ <version>3.2.1</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -637,7 +645,8 @@ under the License.
</execution>
</executions>
<configuration>
- <symbols>HADOOP_NON_SECURE,HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+ <!-- profile: hadoop_non_secure -->
+ <symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
</configuration>
</plugin>
<plugin>
@@ -647,6 +656,7 @@ under the License.
<configuration>
<excludes>
<exclude>**/BspTokenSelector.java</exclude>
+ <exclude>**/SecureRPCCommunications.java</exclude>
</excludes>
<source>${compileSource}</source>
<target>${compileSource}</target>
@@ -695,6 +705,7 @@ under the License.
<directory>${basedir}/src/main/java/org/apache/giraph/hadoop</directory>
<excludes>
<exclude>BspTokenSelector.java</exclude>
+ <exclude>SecureRPCCommunications.java</exclude>
</excludes>
</resource>
</resources>
@@ -706,6 +717,7 @@ under the License.
<configuration>
<excludes>
<exclude>**/BspTokenSelector.java</exclude>
+ <exclude>**/SecureRPCCommunications.java</exclude>
</excludes>
<source>${compileSource}</source>
<target>${compileSource}</target>
@@ -724,8 +736,9 @@ under the License.
</goals>
</execution>
</executions>
+ <!-- profile: hadoop_facebook -->
<configuration>
- <symbols>HADOOP_NON_SECURE,HADOOP_NON_SASL_RPC</symbols>
+ <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
</configuration>
</plugin>
<plugin>
@@ -745,6 +758,9 @@ under the License.
</build>
</profile>
+ <!-- Help keep future Hadoop versions munge-free:
+ All profiles below are munge-free: avoid introducing any munge
+ flags on any of the following profiles. -->
<profile>
<id>hadoop_0.23</id>
<activation>
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Oct 3 23:32:00 2012
@@ -33,6 +33,10 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.utils.MemoryUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+/*if[HADOOP_NON_INTERVERSIONED_RPC]
+else[HADOOP_NON_INTERVERSIONED_RPC]*/
+import org.apache.hadoop.ipc.ProtocolSignature;
+/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.mapreduce.Mapper;
@@ -60,11 +64,6 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-/*if[HADOOP_NON_INTERVERSIONED_RPC]
-else[HADOOP_NON_INTERVERSIONED_RPC]*/
-import org.apache.hadoop.ipc.ProtocolSignature;
-/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
-
/**
* Basic RPC communications object that implements the lower level operations
* for RPC communication.
@@ -237,7 +236,7 @@ public abstract class BasicRPCCommunicat
/**
* Constructor.
*
- * @param peerConnection Connection to send the messsages to.
+ * @param peerConnection Connection to send the messages to.
* @param context Context of the mapper.
*/
PeerFlushExecutor(PeerConnection peerConnection,
@@ -1302,4 +1301,11 @@ public abstract class BasicRPCCommunicat
public String getName() {
return myName;
}
+
+ @Override
+ public ServerData<I, V, E, M> getServerData() {
+ throw
+ new IllegalStateException("getServerData() called" +
+ "while using Hadoop RPC: should only be used by Netty RPC.");
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Wed Oct 3 23:32:00 2012
@@ -22,34 +22,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.Token;
-/*end[HADOOP_NON_SECURE]*/
-
import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.hadoop.BspPolicyProvider;
-/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.conf.Configuration;
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Text;
-/*end[HADOOP_NON_SECURE]*/
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RPC;
@@ -67,11 +45,7 @@ import org.apache.hadoop.mapreduce.Mappe
@SuppressWarnings("rawtypes")
public class RPCCommunications<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- /*if[HADOOP_NON_SASL_RPC]
extends BasicRPCCommunications<I, V, E, M, Object> {
- else[HADOOP_NON_SASL_RPC]*/
- extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
- /*end[HADOOP_NON_SASL_RPC]*/
/** Class logger */
public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
@@ -99,23 +73,7 @@ public class RPCCommunications<I extends
*
* @return Job token.
*/
- protected
- /*if[HADOOP_NON_SECURE]
- Object createJobToken() throws IOException {
- else[HADOOP_NON_SECURE]*/
- Token<JobTokenIdentifier> createJobToken() throws IOException {
- /*end[HADOOP_NON_SECURE]*/
- /*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
- String localJobTokenFile = System.getenv().get(
- UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (localJobTokenFile != null) {
- JobConf jobConf = new JobConf(conf);
- Credentials credentials =
- TokenCache.loadTokens(localJobTokenFile, jobConf);
- return TokenCache.getJobToken(credentials);
- }
- /*end[HADOOP_NON_SECURE]*/
+ protected Object createJobToken() throws IOException {
return null;
}
@@ -131,34 +89,12 @@ public class RPCCommunications<I extends
@Override
protected Server getRPCServer(
InetSocketAddress myAddress, int numHandlers, String jobId,
- /*if[HADOOP_NON_SASL_RPC]
Object jt) throws IOException {
- return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
- numHandlers, false, conf);
- else[HADOOP_NON_SASL_RPC]*/
- Token<JobTokenIdentifier> jt) throws IOException {
- @SuppressWarnings("deprecation")
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- if (jt != null) { //could be null in the case of some unit tests
- jobTokenSecretManager.addTokenForJob(jobId, jt);
- if (LOG.isInfoEnabled()) {
- LOG.info("getRPCServer: Added jobToken " + jt);
- }
- }
- Server server = RPC.getServer(RPCCommunications.class, this,
- myAddress.getHostName(), myAddress.getPort(),
- numHandlers, false, conf, jobTokenSecretManager);
- String hadoopSecurityAuthorization =
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
- if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
- server.refreshServiceAcl(conf, new BspPolicyProvider());
- }
+ Server server = RPC.getServer(this, myAddress.getHostName(),
+ myAddress.getPort(), numHandlers, false, conf);
return server;
- /*end[HADOOP_NON_SASL_RPC]*/
}
-
/**
* Get the RPC proxy.
*
@@ -169,47 +105,10 @@ public class RPCCommunications<I extends
*/
@SuppressWarnings("unchecked")
protected CommunicationsInterface<I, V, E, M> getRPCProxy(
- final InetSocketAddress addr,
- String jobId,
- /*if[HADOOP_NON_SASL_RPC]
- Object jt)
- else[HADOOP_NON_SASL_RPC]*/
- Token<JobTokenIdentifier> jt)
- /*end[HADOOP_NON_SASL_RPC]*/
+ final InetSocketAddress addr, String jobId, Object jt)
throws IOException, InterruptedException {
final Configuration config = new Configuration(conf);
- /*if[HADOOP_NON_SASL_RPC]
- return (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
- else[HADOOP_NON_SASL_RPC]*/
- if (jt == null) {
- return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
- }
- jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
- addr.getPort()));
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- current.addToken(jt);
- UserGroupInformation owner =
- UserGroupInformation.createRemoteUser(jobId);
- owner.addToken(jt);
- return
- owner.doAs(new PrivilegedExceptionAction<
- CommunicationsInterface<I, V, E, M>>() {
- @Override
- @SuppressWarnings("unchecked")
- public CommunicationsInterface<I, V, E, M> run() throws Exception {
- // All methods in CommunicationsInterface will be used for RPC
- return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
- }
- });
- /*end[HADOOP_NON_SASL_RPC]*/
- }
-
- @Override
- public ServerData<I, V, E, M> getServerData() {
- throw new IllegalStateException(
- "getServerData: Tried to get ServerData while using RPC");
+ return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+ CommunicationsInterface.class, VERSION_ID, addr, config);
}
}
Copied: giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java (from r1393275, giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java?p2=giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java&p1=giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java&r1=1393275&r2=1393814&rev=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java Wed Oct 3 23:32:00 2012
@@ -21,11 +21,17 @@ package org.apache.giraph.comm;
import java.io.IOException;
import java.net.InetSocketAddress;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
import java.security.PrivilegedExceptionAction;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -33,28 +39,18 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.Token;
-/*end[HADOOP_NON_SECURE]*/
import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
import org.apache.giraph.hadoop.BspPolicyProvider;
-/*end[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.conf.Configuration;
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Text;
-/*end[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.mapreduce.Mapper;
+
+
+/*if[HADOOP_NON_INTERVERSIONED_RPC]
+else[HADOOP_NON_INTERVERSIONED_RPC]*/
+import org.apache.hadoop.ipc.ProtocolSignature;
+/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
/**
* Used to implement abstract {@link BasicRPCCommunications} methods.
@@ -64,61 +60,68 @@ import org.apache.hadoop.mapreduce.Mappe
* @param <E> Edge data
* @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public class RPCCommunications<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- /*if[HADOOP_NON_SASL_RPC]
- extends BasicRPCCommunications<I, V, E, M, Object> {
- else[HADOOP_NON_SASL_RPC]*/
- extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
- /*end[HADOOP_NON_SASL_RPC]*/
+public class SecureRPCCommunications<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
/** Class logger */
- public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+ public static final Logger LOG =
+ Logger.getLogger(SecureRPCCommunications.class);
/**
* Constructor.
*
* @param context Context to be saved.
- * @param configuration Configuration
* @param service Server worker.
+ * @param configuration Configuration.
* @param graphState Graph state from infrastructure.
* @throws IOException
* @throws InterruptedException
*/
- public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration configuration,
- GraphState<I, V, E, M> graphState) throws
- IOException, InterruptedException {
+ public SecureRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+ CentralizedServiceWorker<I, V, E, M> service,
+ ImmutableClassesGiraphConfiguration configuration,
+ GraphState<I, V, E, M> graphState) throws
+ IOException, InterruptedException {
super(context, configuration, service);
}
/**
- * Create the job token.
- *
- * @return Job token.
- */
- protected
- /*if[HADOOP_NON_SECURE]
- Object createJobToken() throws IOException {
- else[HADOOP_NON_SECURE]*/
- Token<JobTokenIdentifier> createJobToken() throws IOException {
- /*end[HADOOP_NON_SECURE]*/
- /*if[HADOOP_NON_SECURE]
- else[HADOOP_NON_SECURE]*/
+ * Create the job token.
+ *
+ * @return Job token.
+ */
+ protected Token<JobTokenIdentifier> createJobToken() throws IOException {
String localJobTokenFile = System.getenv().get(
- UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (localJobTokenFile != null) {
JobConf jobConf = new JobConf(conf);
Credentials credentials =
- TokenCache.loadTokens(localJobTokenFile, jobConf);
+ TokenCache.loadTokens(localJobTokenFile, jobConf);
return TokenCache.getJobToken(credentials);
}
- /*end[HADOOP_NON_SECURE]*/
return null;
}
+ /*if[HADOOP_NON_INTERVERSIONED_RPC]
+ else[HADOOP_NON_INTERVERSIONED_RPC]*/
+ /**
+ * Get the Protocol Signature for the given protocol,
+ * client version and method.
+ *
+ * @param protocol Protocol.
+ * @param clientVersion Version of Client.
+ * @param clientMethodsHash Hash of Client methods.
+ * @return ProtocolSignature for input parameters.
+ */
+ public ProtocolSignature getProtocolSignature(
+ String protocol,
+ long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return new ProtocolSignature(VERSION_ID, null);
+ }
+ /*end[HADOOP_NON_INTERVERSIONED_RPC]*/
+
/**
* Get the RPC server.
*
@@ -129,36 +132,50 @@ public class RPCCommunications<I extends
* @return RPC server.
*/
@Override
- protected Server getRPCServer(
- InetSocketAddress myAddress, int numHandlers, String jobId,
- /*if[HADOOP_NON_SASL_RPC]
- Object jt) throws IOException {
- return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
- numHandlers, false, conf);
- else[HADOOP_NON_SASL_RPC]*/
- Token<JobTokenIdentifier> jt) throws IOException {
+ protected RPC.Server getRPCServer(
+ InetSocketAddress myAddress, int numHandlers, String jobId,
+ Token<JobTokenIdentifier> jt) throws IOException {
@SuppressWarnings("deprecation")
JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- if (jt != null) { //could be null in the case of some unit tests
+ new JobTokenSecretManager();
+ if (jt != null) { //could be null in the case of some unit tests:
+ // TODO: unit tests should use SecureRPCCommunications or
+ // RPCCommunications
+ // TODO: remove jt from RPCCommunications.
jobTokenSecretManager.addTokenForJob(jobId, jt);
if (LOG.isInfoEnabled()) {
LOG.info("getRPCServer: Added jobToken " + jt);
}
}
- Server server = RPC.getServer(RPCCommunications.class, this,
+
+ // TODO: make munge tag more specific: just use HADOOP_1 maybe.
+ /*if[HADOOP_1_AUTHORIZATION]
+ // Hadoop 1-style authorization.
+ Server server = RPC.getServer(this,
myAddress.getHostName(), myAddress.getPort(),
numHandlers, false, conf, jobTokenSecretManager);
+
+ String hadoopSecurityAuthorization =
+ ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
+ if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
+ ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
+ }
+ else[HADOOP_1_AUTHORIZATION]*/
+ // Hadoop 2+-style authorization.
+ Server server = RPC.getServer(this,
+ myAddress.getHostName(), myAddress.getPort(),
+ numHandlers, false, conf);
+
String hadoopSecurityAuthorization =
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
- server.refreshServiceAcl(conf, new BspPolicyProvider());
+ ServiceAuthorizationManager sam = new ServiceAuthorizationManager();
+ sam.refresh(conf, new BspPolicyProvider());
}
+ /*end[HADOOP_1_AUTHORIZATION]*/
return server;
- /*end[HADOOP_NON_SASL_RPC]*/
}
-
/**
* Get the RPC proxy.
*
@@ -167,31 +184,24 @@ public class RPCCommunications<I extends
* @param jt Job token.
* @return Proxy of the RPC server.
*/
+ @Override
@SuppressWarnings("unchecked")
protected CommunicationsInterface<I, V, E, M> getRPCProxy(
final InetSocketAddress addr,
String jobId,
- /*if[HADOOP_NON_SASL_RPC]
- Object jt)
- else[HADOOP_NON_SASL_RPC]*/
Token<JobTokenIdentifier> jt)
- /*end[HADOOP_NON_SASL_RPC]*/
throws IOException, InterruptedException {
final Configuration config = new Configuration(conf);
- /*if[HADOOP_NON_SASL_RPC]
- return (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
- else[HADOOP_NON_SASL_RPC]*/
if (jt == null) {
return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
- CommunicationsInterface.class, VERSION_ID, addr, config);
+ CommunicationsInterface.class, VERSION_ID, addr, config);
}
jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
- addr.getPort()));
+ addr.getPort()));
UserGroupInformation current = UserGroupInformation.getCurrentUser();
current.addToken(jt);
UserGroupInformation owner =
- UserGroupInformation.createRemoteUser(jobId);
+ UserGroupInformation.createRemoteUser(jobId);
owner.addToken(jt);
return
owner.doAs(new PrivilegedExceptionAction<
@@ -204,12 +214,5 @@ public class RPCCommunications<I extends
CommunicationsInterface.class, VERSION_ID, addr, config);
}
});
- /*end[HADOOP_NON_SASL_RPC]*/
- }
-
- @Override
- public ServerData<I, V, E, M> getServerData() {
- throw new IllegalStateException(
- "getServerData: Tried to get ServerData while using RPC");
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 3 23:32:00 2012
@@ -21,10 +21,14 @@ package org.apache.giraph.graph;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.RPCCommunications;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.comm.netty.NettyWorkerClientServer;
+/*if[HADOOP_NON_SECURE]
+import org.apache.giraph.comm.RPCCommunications;
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.SecureRPCCommunications;
+/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
@@ -159,10 +163,15 @@ public class BspServiceWorker<I extends
commService = new NettyWorkerClientServer<I, V, E, M>(
context, getConfiguration(), this);
} else {
+/*if[HADOOP_NON_SECURE]
+ commService =
+ new RPCCommunications<I, V, E, M>(context, this, getConfiguration(),
+ graphState);
+else[HADOOP_NON_SECURE]*/
commService =
- new RPCCommunications<I, V, E, M>(context, this,
- getConfiguration(),
- graphState);
+ new SecureRPCCommunications<I, V, E, M>(context, this,
+ getConfiguration(), graphState);
+/*end[HADOOP_NON_SECURE]*/
}
if (LOG.isInfoEnabled()) {
LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Wed Oct 3 23:32:00 2012
@@ -30,6 +30,7 @@ import org.apache.giraph.examples.Simple
import org.apache.giraph.examples.SimpleSuperstepVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.LocalityInfoSorter;
@@ -53,10 +54,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobContext;
-/*if[HADOOP_NON_SASL_RPC]
-else[HADOOP_NON_SASL_RPC]*/
+/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
+else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-/*end[HADOOP_NON_SASL_RPC]*/
+/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
@@ -127,15 +128,15 @@ public class TestBspBasic extends BspCas
", graphState" + gs);
VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
inputFormat = configuration.createVertexInputFormat();
- /*if[HADOOP_NON_SASL_RPC]
+/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
List<InputSplit> splitArray =
inputFormat.getSplits(
new JobContext(new Configuration(), new JobID()), 1);
- else[HADOOP_NON_SASL_RPC]*/
+else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
List<InputSplit> splitArray =
inputFormat.getSplits(
new JobContextImpl(new Configuration(), new JobID()), 1);
- /*end[HADOOP_NON_SASL_RPC]*/
+/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);