You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/05/20 14:45:10 UTC
[tez] branch master updated: TEZ-4309: TezUtils.addToConfFromByteString throws com.google.protobuf.CodedInputStream exception (Ramesh Kumar Thangarajan via László Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 0af54df TEZ-4309: TezUtils.addToConfFromByteString throws com.google.protobuf.CodedInputStream exception (Ramesh Kumar Thangarajan via László Bodor)
0af54df is described below
commit 0af54dfa5fb114712efc1bdbcca32bb673bc53ef
Author: Ramesh Kumar Thangarajan <ra...@cloudera.com>
AuthorDate: Thu May 20 16:41:32 2021 +0200
TEZ-4309: TezUtils.addToConfFromByteString throws com.google.protobuf.CodedInputStream exception (Ramesh Kumar Thangarajan via László Bodor)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../main/java/org/apache/tez/common/TezUtils.java | 14 ++++++----
.../java/org/apache/tez/common/TestTezUtils.java | 31 ++++++++++++++++++----
2 files changed, 35 insertions(+), 10 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 3f02418..1c0be98 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -101,6 +101,12 @@ public class TezUtils {
return UserPayload.create(ByteBuffer.wrap(createByteStringFromConf(conf).toByteArray()));
}
+ private static DAGProtos.ConfigurationProto createConfProto(SnappyInputStream uncompressIs) throws IOException {
+ CodedInputStream in = CodedInputStream.newInstance(uncompressIs);
+ in.setSizeLimit(Integer.MAX_VALUE);
+ return DAGProtos.ConfigurationProto.parseFrom(in);
+ }
+
/**
* Convert a byte string to a Configuration object
*
@@ -112,9 +118,7 @@ public class TezUtils {
public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
Objects.requireNonNull(byteString, "ByteString must be specified");
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());) {
- CodedInputStream in = CodedInputStream.newInstance(uncompressIs);
- in.setSizeLimit(Integer.MAX_VALUE);
- DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(in);
+ DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
Configuration conf = new Configuration(false);
readConfFromPB(confProto, conf);
TezClassLoader.setupForConfiguration(conf);
@@ -129,7 +133,7 @@ public class TezUtils {
UserPayload payload = context.getUserPayload();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
- DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+ DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
readConfFromPB(confProto, configuration);
TezClassLoader.setupForConfiguration(configuration);
return configuration;
@@ -139,7 +143,7 @@ public class TezUtils {
public static void addToConfFromByteString(Configuration configuration, ByteString byteString)
throws IOException {
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
- DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+ DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
readConfFromPB(confProto, configuration);
TezClassLoader.setupForConfiguration(configuration);
}
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index f9008b9..d599caf 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -54,13 +54,10 @@ public class TestTezUtils {
checkConf(conf);
}
- @Test (timeout=20000)
- public void testByteStringToAndFromLargeConf() throws IOException {
- Configuration conf = getConf();
+ private String constructLargeValue() {
int largeSizeMinimum = 64 * 1024 * 1024;
final String alphaString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
int largeSize = (largeSizeMinimum + alphaString.length() - 1) / alphaString.length();
-
largeSize *= alphaString.length();
assertTrue(largeSize >= alphaString.length());
StringBuilder sb = new StringBuilder(largeSize);
@@ -71,9 +68,20 @@ public class TestTezUtils {
String largeValue = sb.toString();
Assert.assertEquals(largeSize, largeValue.length());
+ return largeValue;
+ }
+
+ private ByteString createByteString(Configuration conf, String largeValue) throws IOException {
conf.set("testLargeValue", largeValue);
Assert.assertEquals(conf.size(), 7);
- ByteString bsConf = TezUtils.createByteStringFromConf(conf);
+ return TezUtils.createByteStringFromConf(conf);
+ }
+
+ @Test (timeout=20000)
+ public void testByteStringToAndFromLargeConf() throws IOException {
+ Configuration conf = getConf();
+ String largeValue = constructLargeValue();
+ ByteString bsConf = createByteString(conf, largeValue);
conf.clear();
Assert.assertEquals(conf.size(), 0);
conf = TezUtils.createConfFromByteString(bsConf);
@@ -82,6 +90,19 @@ public class TestTezUtils {
Assert.assertEquals(conf.get("testLargeValue"), largeValue);
}
+ @Test (timeout=20000)
+ public void testByteStringAddToLargeConf() throws IOException {
+ Configuration conf = getConf();
+ String largeValue = constructLargeValue();
+ ByteString bsConf = createByteString(conf, largeValue);
+ conf.clear();
+ Assert.assertEquals(conf.size(), 0);
+ TezUtils.addToConfFromByteString(conf, bsConf);
+ Assert.assertEquals(conf.size(), 7);
+ checkConf(conf);
+ Assert.assertEquals(conf.get("testLargeValue"), largeValue);
+ }
+
@Test (timeout=2000)
public void testPayloadToAndFromConf() throws IOException {
Configuration conf = getConf();