You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/20 21:45:37 UTC

[4/5] incubator-kudu git commit: [java-client] add support for RPC and application feature flags

[java-client] add support for RPC and application feature flags

Change-Id: I2a0af956b7f4c949ce79e06c877db5f4220c9a5f
Reviewed-on: http://gerrit.cloudera.org:8080/2566
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/570611b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/570611b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/570611b2

Branch: refs/heads/master
Commit: 570611b246319dffb338b9f21e2c0fadc4deb78f
Parents: 53e67e9
Author: Dan Burkert <da...@cloudera.com>
Authored: Tue Mar 15 14:33:13 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Mar 20 20:42:33 2016 +0000

----------------------------------------------------------------------
 .../main/java/org/kududb/client/KuduRpc.java    | 14 +++++-
 .../java/org/kududb/client/SecureRpcHelper.java | 45 ++++++++++++++++----
 .../java/org/kududb/client/TabletClient.java    | 13 +++++-
 3 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
index 9ef81b8..4219427 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
@@ -25,17 +25,19 @@
  */
 package org.kududb.client;
 
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Deferred;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.util.Pair;
 import org.kududb.util.Slice;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import static org.kududb.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
@@ -121,6 +123,14 @@ public abstract class KuduRpc<R> {
   abstract String method();
 
   /**
+   * Returns the set of application-specific feature flags required to service the RPC.
+   * @return the feature flags required to complete the RPC
+   */
+  Collection<Integer> getRequiredFeatures() {
+    return ImmutableList.of();
+  }
+
+  /**
    * To be implemented by the concrete sub-type.
    * This method is expected to de-serialize a response received for the
    * current RPC.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
index 421f618..53e108a 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/SecureRpcHelper.java
@@ -25,6 +25,8 @@
  */
 package org.kududb.client;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -36,6 +38,9 @@ import org.kududb.rpc.RpcHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -46,8 +51,6 @@ import javax.security.sasl.RealmChoiceCallback;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
-import java.util.Map;
-import java.util.TreeMap;
 
 @InterfaceAudience.Private
 public class SecureRpcHelper {
@@ -57,31 +60,49 @@ public class SecureRpcHelper {
   private final TabletClient client;
   private SaslClient saslClient;
   public static final String SASL_DEFAULT_REALM = "default";
-  public static final Map<String, String> SASL_PROPS =
-      new TreeMap<String, String>();
+  public static final Map<String, String> SASL_PROPS = new TreeMap<>();
   private static final int SASL_CALL_ID = -33;
+  private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
+      ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
   private volatile boolean negoUnderway = true;
   private boolean useWrap = false; // no QOP at the moment
+  private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
 
   public static final String USER_AND_PASSWORD = "java_client";
 
   public SecureRpcHelper(TabletClient client) {
     this.client = client;
     try {
-      saslClient = Sasl.createSaslClient(new String[]{"PLAIN"
-      }, null, null, SASL_DEFAULT_REALM,
-          SASL_PROPS, new SaslClientCallbackHandler(USER_AND_PASSWORD, USER_AND_PASSWORD));
+      saslClient = Sasl.createSaslClient(new String[]{"PLAIN"},
+                                         null,
+                                         null,
+                                         SASL_DEFAULT_REALM,
+                                         SASL_PROPS,
+                                         new SaslClientCallbackHandler(USER_AND_PASSWORD,
+                                                                       USER_AND_PASSWORD));
     } catch (SaslException e) {
       throw new RuntimeException("Could not create the SASL client", e);
     }
   }
 
+  public Set<RpcHeader.RpcFeatureFlag> getServerFeatures() {
+    Preconditions.checkState(!negoUnderway);
+    Preconditions.checkNotNull(serverFeatures);
+    return serverFeatures;
+  }
+
   public void sendHello(Channel channel) {
     sendNegotiateMessage(channel);
   }
 
   private void sendNegotiateMessage(Channel channel) {
     RpcHeader.SaslMessagePB.Builder builder = RpcHeader.SaslMessagePB.newBuilder();
+
+    // Advertise our supported features
+    for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
+      builder.addSupportedFeatures(flag);
+    }
+
     builder.setState(RpcHeader.SaslMessagePB.SaslState.NEGOTIATE);
     sendSaslMessage(channel, builder.build());
   }
@@ -181,6 +202,15 @@ public class SecureRpcHelper {
     for (RpcHeader.SaslMessagePB.SaslAuth auth : response.getAuthsList()) {
       negotiatedAuth = auth;
     }
+
+    ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
+    for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
+      if (SUPPORTED_RPC_FEATURES.contains(feature)) {
+        features.add(feature);
+      }
+    }
+    serverFeatures = features.build();
+
     byte[] saslToken = new byte[0];
     if (saslClient.hasInitialResponse())
       saslToken = saslClient.evaluateChallenge(saslToken);
@@ -192,7 +222,6 @@ public class SecureRpcHelper {
     builder.setState(RpcHeader.SaslMessagePB.SaslState.INITIATE);
     builder.addAuths(negotiatedAuth);
     sendSaslMessage(chan, builder.build());
-
   }
 
   private void handleChallengeResponse(Channel chan, RpcHeader.SaslMessagePB response) throws

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/570611b2/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index 4556b52..e18eb55 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -26,6 +26,8 @@
 
 package org.kududb.client;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.stumbleupon.async.Deferred;
 
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
@@ -128,8 +130,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    * Maps an RPC ID to the in-flight RPC that was given this ID.
    * RPCs can be sent out from any thread, so we need a concurrent map.
    */
-  private final ConcurrentHashMap<Integer, KuduRpc<?>> rpcs_inflight =
-      new ConcurrentHashMap<Integer, KuduRpc<?>>();
+  private final ConcurrentHashMap<Integer, KuduRpc<?>> rpcs_inflight = new ConcurrentHashMap<>();
 
   private final AsyncKuduClient kuduClient;
 
@@ -150,6 +151,13 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
     }
     if (chan != null) {
+      if (!rpc.getRequiredFeatures().isEmpty() &&
+          !secureRpcHelper.getServerFeatures().contains(
+              RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
+        rpc.errback(new NonRecoverableException(
+            "the server does not support the APPLICATION_FEATURE_FLAGS RPC feature"));
+      }
+
       final ChannelBuffer serialized = encode(rpc);
       if (serialized == null) {  // Error during encoding.
         return;  // Stop here.  RPC has been failed already.
@@ -197,6 +205,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     try {
       final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
           .setCallId(rpcid)
+          .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
           .setRemoteMethod(
               RpcHeader.RemoteMethodPB.newBuilder().setServiceName(service).setMethodName(method));