You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/09/16 13:54:31 UTC
[accumulo] branch main updated: Upgrade Thrift to 0.13.0 (#1780)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 9e101e1 Upgrade Thrift to 0.13.0 (#1780)
9e101e1 is described below
commit 9e101e189e20943af930144878d6b65722f42a8c
Author: BukrosSzabolcs <sz...@cloudera.com>
AuthorDate: Thu Sep 16 15:54:22 2021 +0200
Upgrade Thrift to 0.13.0 (#1780)
---
.../accumulo/core/util/ThriftMessageUtil.java | 28 ++++++++++++----------
pom.xml | 2 +-
.../apache/accumulo/server/rpc/TimedProcessor.java | 4 ++--
.../accumulo/server/rpc/UGIAssumingProcessor.java | 9 +++----
4 files changed, 23 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
index f7372cf..e993695 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
@@ -27,7 +27,7 @@ import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TMemoryInputTransport;
/**
@@ -36,18 +36,21 @@ import org.apache.thrift.transport.TMemoryInputTransport;
*/
public class ThriftMessageUtil {
- private final AutoExpandingBufferWriteTransport transport;
- private final TProtocol protocol;
+ private final int initialCapacity;
+
+ private final TMemoryInputTransport inputTransport;
+ private final TCompactProtocol inputProtocol;
public ThriftMessageUtil() {
- this(64, 1.5);
+ this(64);
}
- public ThriftMessageUtil(int initialCapacity, double growthCoefficient) {
+ public ThriftMessageUtil(int initialCapacity) {
// TODO does this make sense? better to push this down to the serialize method (accept the
// transport as an argument)?
- this.transport = new AutoExpandingBufferWriteTransport(initialCapacity, growthCoefficient);
- this.protocol = new TCompactProtocol(transport);
+ this.initialCapacity = initialCapacity;
+ this.inputTransport = new TMemoryInputTransport();
+ this.inputProtocol = new TCompactProtocol(inputTransport);
}
/**
@@ -61,14 +64,14 @@ public class ThriftMessageUtil {
*/
public ByteBuffer serialize(TBase<?,?> msg) throws IOException {
requireNonNull(msg);
- transport.reset();
+ TMemoryBuffer transport = new TMemoryBuffer(initialCapacity);
+ TProtocol protocol = new TCompactProtocol(transport);
try {
msg.write(protocol);
- // We should flush(), but we know its a noop
} catch (TException e) {
throw new IOException(e);
}
- return ByteBuffer.wrap(transport.getBuf().array(), 0, transport.getPos());
+ return ByteBuffer.wrap(transport.getArray(), 0, transport.length());
}
/**
@@ -94,10 +97,9 @@ public class ThriftMessageUtil {
public <T extends TBase<?,?>> T deserialize(byte[] serialized, int offset, int length, T instance)
throws IOException {
requireNonNull(instance);
- TCompactProtocol proto =
- new TCompactProtocol(new TMemoryInputTransport(serialized, offset, length));
+ inputTransport.reset(serialized, offset, length);
try {
- instance.read(proto);
+ instance.read(inputProtocol);
} catch (TException e) {
throw new IOException(e);
}
diff --git a/pom.xml b/pom.xml
index 4cec469..d602b26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,7 +154,7 @@
<!-- 3.0.0-M5 causes RowHashIT.test and ShellServerIT.scansWithClassLoaderContext to fail -->
<surefire.version>3.0.0-M4</surefire.version>
<!-- Thrift version -->
- <thrift.version>0.12.0</thrift.version>
+ <thrift.version>0.13.0</thrift.version>
<unitTestMemSize>-Xmx1G</unitTestMemSize>
<!-- ZooKeeper version -->
<zookeeper.version>3.5.9</zookeeper.version>
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index eee13fd..453d9b8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -55,12 +55,12 @@ public class TimedProcessor implements TProcessor {
}
@Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
+ public void process(TProtocol in, TProtocol out) throws TException {
long now = 0;
now = System.currentTimeMillis();
thriftMetrics.addIdle(now - idleStart);
try {
- return other.process(in, out);
+ other.process(in, out);
} finally {
idleStart = System.currentTimeMillis();
thriftMetrics.addExecute(idleStart - now);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
index 2f46082..fa84f59 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
@@ -75,7 +75,7 @@ public class UGIAssumingProcessor implements TProcessor {
}
@Override
- public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ public void process(final TProtocol inProt, final TProtocol outProt) throws TException {
TTransport trans = inProt.getTransport();
if (!(trans instanceof TSaslServerTransport)) {
throw new TException("Unexpected non-SASL transport " + trans.getClass() + ": " + trans);
@@ -100,25 +100,26 @@ public class UGIAssumingProcessor implements TProcessor {
try {
// Set the principal in the ThreadLocal for access to get authorizations
rpcPrincipal.set(remoteUser);
-
- return wrapped.process(inProt, outProt);
+ wrapped.process(inProt, outProt);
} finally {
// Unset the principal after we're done using it just to be sure that it's not incorrectly
// used in the same thread down the line.
rpcPrincipal.set(null);
}
+ break;
case DIGEST_MD5:
// The CallbackHandler, after deserializing the TokenIdentifier in the name, has already
// updated
// the rpcPrincipal for us. We don't need to do it again here.
try {
rpcMechanism.set(mechanism);
- return wrapped.process(inProt, outProt);
+ wrapped.process(inProt, outProt);
} finally {
// Unset the mechanism after we're done using it just to be sure that it's not incorrectly
// used in the same thread down the line.
rpcMechanism.set(null);
}
+ break;
default:
throw new IllegalArgumentException("Cannot process SASL mechanism " + mechanism);
}