You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/09/16 13:54:31 UTC

[accumulo] branch main updated: Upgrade Thrift to 0.13.0 (#1780)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e101e1  Upgrade Thrift to 0.13.0 (#1780)
9e101e1 is described below

commit 9e101e189e20943af930144878d6b65722f42a8c
Author: BukrosSzabolcs <sz...@cloudera.com>
AuthorDate: Thu Sep 16 15:54:22 2021 +0200

    Upgrade Thrift to 0.13.0 (#1780)
---
 .../accumulo/core/util/ThriftMessageUtil.java      | 28 ++++++++++++----------
 pom.xml                                            |  2 +-
 .../apache/accumulo/server/rpc/TimedProcessor.java |  4 ++--
 .../accumulo/server/rpc/UGIAssumingProcessor.java  |  9 +++----
 4 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
index f7372cf..e993695 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
@@ -27,7 +27,7 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
 
 /**
@@ -36,18 +36,21 @@ import org.apache.thrift.transport.TMemoryInputTransport;
  */
 public class ThriftMessageUtil {
 
-  private final AutoExpandingBufferWriteTransport transport;
-  private final TProtocol protocol;
+  private final int initialCapacity;
+
+  private final TMemoryInputTransport inputTransport;
+  private final TCompactProtocol inputProtocol;
 
   public ThriftMessageUtil() {
-    this(64, 1.5);
+    this(64);
   }
 
-  public ThriftMessageUtil(int initialCapacity, double growthCoefficient) {
+  public ThriftMessageUtil(int initialCapacity) {
     // TODO does this make sense? better to push this down to the serialize method (accept the
     // transport as an argument)?
-    this.transport = new AutoExpandingBufferWriteTransport(initialCapacity, growthCoefficient);
-    this.protocol = new TCompactProtocol(transport);
+    this.initialCapacity = initialCapacity;
+    this.inputTransport = new TMemoryInputTransport();
+    this.inputProtocol = new TCompactProtocol(inputTransport);
   }
 
   /**
@@ -61,14 +64,14 @@ public class ThriftMessageUtil {
    */
   public ByteBuffer serialize(TBase<?,?> msg) throws IOException {
     requireNonNull(msg);
-    transport.reset();
+    TMemoryBuffer transport = new TMemoryBuffer(initialCapacity);
+    TProtocol protocol = new TCompactProtocol(transport);
     try {
       msg.write(protocol);
-      // We should flush(), but we know its a noop
     } catch (TException e) {
       throw new IOException(e);
     }
-    return ByteBuffer.wrap(transport.getBuf().array(), 0, transport.getPos());
+    return ByteBuffer.wrap(transport.getArray(), 0, transport.length());
   }
 
   /**
@@ -94,10 +97,9 @@ public class ThriftMessageUtil {
   public <T extends TBase<?,?>> T deserialize(byte[] serialized, int offset, int length, T instance)
       throws IOException {
     requireNonNull(instance);
-    TCompactProtocol proto =
-        new TCompactProtocol(new TMemoryInputTransport(serialized, offset, length));
+    inputTransport.reset(serialized, offset, length);
     try {
-      instance.read(proto);
+      instance.read(inputProtocol);
     } catch (TException e) {
       throw new IOException(e);
     }
diff --git a/pom.xml b/pom.xml
index 4cec469..d602b26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,7 +154,7 @@
     <!-- 3.0.0-M5 causes RowHashIT.test and ShellServerIT.scansWithClassLoaderContext to fail -->
     <surefire.version>3.0.0-M4</surefire.version>
     <!-- Thrift version -->
-    <thrift.version>0.12.0</thrift.version>
+    <thrift.version>0.13.0</thrift.version>
     <unitTestMemSize>-Xmx1G</unitTestMemSize>
     <!-- ZooKeeper version -->
     <zookeeper.version>3.5.9</zookeeper.version>
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index eee13fd..453d9b8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -55,12 +55,12 @@ public class TimedProcessor implements TProcessor {
   }
 
   @Override
-  public boolean process(TProtocol in, TProtocol out) throws TException {
+  public void process(TProtocol in, TProtocol out) throws TException {
     long now = 0;
     now = System.currentTimeMillis();
     thriftMetrics.addIdle(now - idleStart);
     try {
-      return other.process(in, out);
+      other.process(in, out);
     } finally {
       idleStart = System.currentTimeMillis();
       thriftMetrics.addExecute(idleStart - now);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
index 2f46082..fa84f59 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
@@ -75,7 +75,7 @@ public class UGIAssumingProcessor implements TProcessor {
   }
 
   @Override
-  public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+  public void process(final TProtocol inProt, final TProtocol outProt) throws TException {
     TTransport trans = inProt.getTransport();
     if (!(trans instanceof TSaslServerTransport)) {
       throw new TException("Unexpected non-SASL transport " + trans.getClass() + ": " + trans);
@@ -100,25 +100,26 @@ public class UGIAssumingProcessor implements TProcessor {
         try {
           // Set the principal in the ThreadLocal for access to get authorizations
           rpcPrincipal.set(remoteUser);
-
-          return wrapped.process(inProt, outProt);
+          wrapped.process(inProt, outProt);
         } finally {
           // Unset the principal after we're done using it just to be sure that it's not incorrectly
           // used in the same thread down the line.
           rpcPrincipal.set(null);
         }
+        break;
       case DIGEST_MD5:
         // The CallbackHandler, after deserializing the TokenIdentifier in the name, has already
         // updated
         // the rpcPrincipal for us. We don't need to do it again here.
         try {
           rpcMechanism.set(mechanism);
-          return wrapped.process(inProt, outProt);
+          wrapped.process(inProt, outProt);
         } finally {
           // Unset the mechanism after we're done using it just to be sure that it's not incorrectly
           // used in the same thread down the line.
           rpcMechanism.set(null);
         }
+        break;
       default:
         throw new IllegalArgumentException("Cannot process SASL mechanism " + mechanism);
     }