You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/20 21:45:37 UTC
[4/5] incubator-kudu git commit: [java-client] add support for RPC
and application feature flags
[java-client] add support for RPC and application feature flags
Change-Id: I2a0af956b7f4c949ce79e06c877db5f4220c9a5f
Reviewed-on: http://gerrit.cloudera.org:8080/2566
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/570611b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/570611b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/570611b2
Branch: refs/heads/master
Commit: 570611b246319dffb338b9f21e2c0fadc4deb78f
Parents: 53e67e9
Author: Dan Burkert <da...@cloudera.com>
Authored: Tue Mar 15 14:33:13 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Mar 20 20:42:33 2016 +0000
----------------------------------------------------------------------
.../main/java/org/kududb/client/KuduRpc.java | 14 +++++-
.../java/org/kududb/client/SecureRpcHelper.java | 45 ++++++++++++++++----
.../java/org/kududb/client/TabletClient.java | 13 +++++-
3 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
index 9ef81b8..4219427 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
@@ -25,17 +25,19 @@
*/
package org.kududb.client;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.stumbleupon.async.Deferred;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.kududb.annotations.InterfaceAudience;
import org.kududb.util.Pair;
import org.kududb.util.Slice;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
import java.io.IOException;
+import java.util.Collection;
import static org.kududb.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
@@ -121,6 +123,14 @@ public abstract class KuduRpc<R> {
abstract String method();
/**
+ * Returns the set of application-specific feature flags required to service the RPC.
+ * @return the feature flags required to complete the RPC
+ */
+ Collection<Integer> getRequiredFeatures() {
+ return ImmutableList.of();
+ }
+
+ /**
* To be implemented by the concrete sub-type.
* This method is expected to de-serialize a response received for the
* current RPC.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
index 421f618..53e108a 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
@@ -25,6 +25,8 @@
*/
package org.kududb.client;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -36,6 +38,9 @@ import org.kududb.rpc.RpcHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -46,8 +51,6 @@ import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import java.util.Map;
-import java.util.TreeMap;
@InterfaceAudience.Private
public class SecureRpcHelper {
@@ -57,31 +60,49 @@ public class SecureRpcHelper {
private final TabletClient client;
private SaslClient saslClient;
public static final String SASL_DEFAULT_REALM = "default";
- public static final Map<String, String> SASL_PROPS =
- new TreeMap<String, String>();
+ public static final Map<String, String> SASL_PROPS = new TreeMap<>();
private static final int SASL_CALL_ID = -33;
+ private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
+ ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
private volatile boolean negoUnderway = true;
private boolean useWrap = false; // no QOP at the moment
+ private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
public static final String USER_AND_PASSWORD = "java_client";
public SecureRpcHelper(TabletClient client) {
this.client = client;
try {
- saslClient = Sasl.createSaslClient(new String[]{"PLAIN"
- }, null, null, SASL_DEFAULT_REALM,
- SASL_PROPS, new SaslClientCallbackHandler(USER_AND_PASSWORD, USER_AND_PASSWORD));
+ saslClient = Sasl.createSaslClient(new String[]{"PLAIN"},
+ null,
+ null,
+ SASL_DEFAULT_REALM,
+ SASL_PROPS,
+ new SaslClientCallbackHandler(USER_AND_PASSWORD,
+ USER_AND_PASSWORD));
} catch (SaslException e) {
throw new RuntimeException("Could not create the SASL client", e);
}
}
+ public Set<RpcHeader.RpcFeatureFlag> getServerFeatures() {
+ Preconditions.checkState(!negoUnderway);
+ Preconditions.checkNotNull(serverFeatures);
+ return serverFeatures;
+ }
+
public void sendHello(Channel channel) {
sendNegotiateMessage(channel);
}
private void sendNegotiateMessage(Channel channel) {
RpcHeader.SaslMessagePB.Builder builder = RpcHeader.SaslMessagePB.newBuilder();
+
+ // Advertise our supported features
+ for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
+ builder.addSupportedFeatures(flag);
+ }
+
builder.setState(RpcHeader.SaslMessagePB.SaslState.NEGOTIATE);
sendSaslMessage(channel, builder.build());
}
@@ -181,6 +202,15 @@ public class SecureRpcHelper {
for (RpcHeader.SaslMessagePB.SaslAuth auth : response.getAuthsList()) {
negotiatedAuth = auth;
}
+
+ ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
+ for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
+ if (SUPPORTED_RPC_FEATURES.contains(feature)) {
+ features.add(feature);
+ }
+ }
+ serverFeatures = features.build();
+
byte[] saslToken = new byte[0];
if (saslClient.hasInitialResponse())
saslToken = saslClient.evaluateChallenge(saslToken);
@@ -192,7 +222,6 @@ public class SecureRpcHelper {
builder.setState(RpcHeader.SaslMessagePB.SaslState.INITIATE);
builder.addAuths(negotiatedAuth);
sendSaslMessage(chan, builder.build());
-
}
private void handleChallengeResponse(Channel chan, RpcHeader.SaslMessagePB response) throws
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index 4556b52..e18eb55 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -26,6 +26,8 @@
package org.kududb.client;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.stumbleupon.async.Deferred;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
@@ -128,8 +130,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
* Maps an RPC ID to the in-flight RPC that was given this ID.
* RPCs can be sent out from any thread, so we need a concurrent map.
*/
- private final ConcurrentHashMap<Integer, KuduRpc<?>> rpcs_inflight =
- new ConcurrentHashMap<Integer, KuduRpc<?>>();
+ private final ConcurrentHashMap<Integer, KuduRpc<?>> rpcs_inflight = new ConcurrentHashMap<>();
private final AsyncKuduClient kuduClient;
@@ -150,6 +151,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
}
if (chan != null) {
+ if (!rpc.getRequiredFeatures().isEmpty() &&
+ !secureRpcHelper.getServerFeatures().contains(
+ RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
+ rpc.errback(new NonRecoverableException(
+ "the server does not support the APPLICATION_FEATURE_FLAGS RPC feature"));
+ }
+
final ChannelBuffer serialized = encode(rpc);
if (serialized == null) { // Error during encoding.
return; // Stop here. RPC has been failed already.
@@ -197,6 +205,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
try {
final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
.setCallId(rpcid)
+ .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
.setRemoteMethod(
RpcHeader.RemoteMethodPB.newBuilder().setServiceName(service).setMethodName(method));