You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/09/14 09:07:23 UTC
[hive] branch master updated: HIVE-25501: Provide a filter for
removing useless properties from PartitionDesc objects before MapWork
serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a4e3552 HIVE-25501: Provide a filter for removing useless properties from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)
a4e3552 is described below
commit a4e3552e9eff53dd44d3aaae4f91ad2fb89cec6e
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Tue Sep 14 11:07:11 2021 +0200
HIVE-25501: Provide a filter for removing useless properties from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 +
.../hive/ql/exec/SerializationUtilities.java | 148 +++++++++++----
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 2 +-
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 1 -
.../hive/ql/exec/TestSerializationUtilities.java | 199 +++++++++++++++++++++
5 files changed, 318 insertions(+), 39 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb5b981..f61b903 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4524,6 +4524,13 @@ public class HiveConf extends Configuration {
HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false,
"Whether to send the query plan via local resource or RPC"),
+ HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES("hive.plan.mapwork.serialization.skip.properties", "",
+ "Comma separated list of properties which is not needed in execution time, so can be removed "
+ + "from PartitionDesc properties before serialization, config can contain exact strings and regex "
+ + "expressions, the regex mode is activated if at least 1 asterisk (*) is present in the current word: "
+ + "rawDataSize exact string match, removes only rawDataSize property"
+ + ".*Size regex match, removes every property ending with 'Size'"
+ + "numRows,impala_.*chunk.* comma separated and mixed (handles strings and regexes at the same time)"),
HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true,
"Whether to generate the splits locally or in the AM (tez only)"),
HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index d4e8407..56e1b32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -31,15 +31,22 @@ import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
import org.apache.hadoop.hive.common.type.TimestampTZ;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
@@ -60,6 +67,7 @@ import com.esotericsoftware.kryo.Registration;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.Pool;
+import com.google.common.annotations.VisibleForTesting;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
/**
@@ -111,8 +119,10 @@ public class SerializationUtilities {
/**
* Provides general-purpose hooks for specific types, as well as a global hook.
*/
- private static class KryoWithHooks extends Kryo {
+ private static class KryoWithHooks extends Kryo implements Configurable {
private Hook globalHook;
+ // this should be set on-the-fly after borrowing this instance and needs to be reset on release
+ private Configuration configuration;
@SuppressWarnings({"unchecked", "rawtypes"})
private static final class SerializerWithHook extends com.esotericsoftware.kryo.Serializer {
@@ -206,6 +216,17 @@ public class SerializationUtilities {
T result = super.readObject(input, type, serializer);
return ponderGlobalPostReadHook(hook, result);
}
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.configuration = conf;
+
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
}
private static final Object FAKE_REFERENCE = new Object();
@@ -234,6 +255,7 @@ public class SerializationUtilities {
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new ArrayListSubListSerializer());
kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer());
+ kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class));
kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, PartitionDesc.class));
((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
@@ -254,8 +276,13 @@ public class SerializationUtilities {
* @return kryo instance
*/
public static Kryo borrowKryo() {
+ return borrowKryo(null);
+ }
+
+ public static Kryo borrowKryo(Configuration configuration) {
Kryo kryo = kryoPool.obtain();
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ ((KryoWithHooks) kryo).setConf(configuration);
return kryo;
}
@@ -265,6 +292,9 @@ public class SerializationUtilities {
* @param kryo - kryo instance to be released
*/
public static void releaseKryo(Kryo kryo) {
+ if (kryo != null){
+ ((KryoWithHooks) kryo).setConf(null);
+ }
kryoPool.free(kryo);
}
@@ -542,6 +572,70 @@ public class SerializationUtilities {
}
/**
+ * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link MapWork} objects e.g. in
+ * order to remove useless properties in execution time.
+ */
+ private static class MapWorkSerializer extends FieldSerializer<MapWork> {
+
+ public MapWorkSerializer(Kryo kryo, Class type) {
+ super(kryo, type);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, MapWork mapWork) {
+ filterMapworkProperties(kryo, mapWork);
+ super.write(kryo, output, mapWork);
+ }
+
+ private void filterMapworkProperties(Kryo kryo, MapWork mapWork) {
+ Configuration configuration = ((KryoWithHooks) kryo).getConf();
+ if (configuration == null || HiveConf
+ .getVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).isEmpty()) {
+ return;
+ }
+ String[] filterProps =
+ HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).split(",");
+ for (String prop : filterProps) {
+ boolean isRegex = isRegex(prop);
+ Pattern pattern = Pattern.compile(prop);
+
+ LOG.debug("Trying to filter MapWork properties (regex: " + isRegex + "): " + prop);
+
+ for (Entry<Path, PartitionDesc> partDescEntry : mapWork.getPathToPartitionInfo().entrySet()) {
+ /*
+ * remove by regex, could be a bit more expensive because of iterating and matching regexes
+ * e.g.: in case of impala_intermediate_stats_chunk1, impala_intermediate_stats_chunk2, user only needs to
+ * configure impala_intermediate_stats_chunk.*
+ */
+ if (isRegex) {
+ Iterator<Entry<Object, Object>> itProps =
+ partDescEntry.getValue().getProperties().entrySet().iterator();
+ while (itProps.hasNext()) {
+ Map.Entry<Object, Object> entry = itProps.next();
+ String actualProp = (String) entry.getKey();
+ Matcher matcher = pattern.matcher(actualProp);
+
+ if (matcher.find()) {
+ LOG.debug("Removed '{}' from MapWork (partition: {})", actualProp, partDescEntry.getKey());
+ itProps.remove();
+ }
+ }
+ } else {
+ Object objRemoved = partDescEntry.getValue().getProperties().remove(prop);
+ if (objRemoved != null) {
+ LOG.debug("Removed '{}' from MapWork (partition: {})", prop, partDescEntry.getKey());
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isRegex(String prop) {
+ return prop.contains("*");
+ }
+ }
+
+ /**
* We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link PartitionDesc} objects
* in order to invoke any string interning code present in the "setter" methods. {@link
* PartitionDesc} objects are usually stored by {@link MapWork} objects and contain duplicate info
@@ -573,31 +667,24 @@ public class SerializationUtilities {
* @param out The stream to write to.
*/
public static void serializePlan(Object plan, OutputStream out) {
- serializePlan(plan, out, false);
- }
-
- public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
- serializePlan(kryo, plan, out, false);
+ serializePlan(plan, out, null);
}
- private static void serializePlan(Object plan, OutputStream out, boolean cloningPlan) {
- Kryo kryo = borrowKryo();
+ @VisibleForTesting
+ static void serializePlan(Object plan, OutputStream out, Configuration configuration) {
+ Kryo kryo = borrowKryo(configuration);
try {
- serializePlan(kryo, plan, out, cloningPlan);
+ serializePlan(kryo, plan, out);
} finally {
releaseKryo(kryo);
}
}
- private static void serializePlan(Kryo kryo, Object plan, OutputStream out, boolean cloningPlan) {
+ public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
LOG.info("Serializing " + plan.getClass().getSimpleName() + " using kryo");
- if (cloningPlan) {
- serializeObjectByKryo(kryo, plan, out);
- } else {
- serializeObjectByKryo(kryo, plan, out);
- }
+ serializeObjectByKryo(kryo, plan, out);
perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
}
@@ -609,35 +696,22 @@ public class SerializationUtilities {
* @return The plan, such as QueryPlan, MapredWork, etc.
*/
public static <T> T deserializePlan(InputStream in, Class<T> planClass) {
- return deserializePlan(in, planClass, false);
- }
-
- public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass) {
- return deserializePlan(kryo, in, planClass, false);
- }
-
- private static <T> T deserializePlan(InputStream in, Class<T> planClass, boolean cloningPlan) {
Kryo kryo = borrowKryo();
T result = null;
try {
- result = deserializePlan(kryo, in, planClass, cloningPlan);
+ result = deserializePlan(kryo, in, planClass);
} finally {
releaseKryo(kryo);
}
return result;
}
- private static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
- boolean cloningPlan) {
+ public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass) {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
T plan;
LOG.info("Deserializing " + planClass.getSimpleName() + " using kryo");
- if (cloningPlan) {
- plan = deserializeObjectByKryo(kryo, in, planClass);
- } else {
- plan = deserializeObjectByKryo(kryo, in, planClass);
- }
+ plan = deserializeObjectByKryo(kryo, in, planClass);
perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
return plan;
}
@@ -654,9 +728,9 @@ public class SerializationUtilities {
Operator<?> op = plan.getAnyOperator();
CompilationOpContext ctx = (op == null) ? null : op.getCompilationOpContext();
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializePlan(plan, baos, true);
+ serializePlan(plan, baos);
MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- MapredWork.class, true);
+ MapredWork.class);
// Restore the context.
for (Operator<?> newOp : newPlan.getAllOperators()) {
newOp.setCompilationOpContext(ctx);
@@ -676,11 +750,11 @@ public class SerializationUtilities {
}
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
CompilationOpContext ctx = roots.get(0).getCompilationOpContext();
- serializePlan(roots, baos, true);
+ serializePlan(roots, baos);
@SuppressWarnings("unchecked")
List<Operator<?>> result =
deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- roots.getClass(), true);
+ roots.getClass());
// Restore the context.
LinkedList<Operator<?>> newOps = new LinkedList<>(result);
while (!newOps.isEmpty()) {
@@ -705,9 +779,9 @@ public class SerializationUtilities {
Operator<?> op = plan.getAnyRootOperator();
CompilationOpContext ctx = (op == null) ? null : op.getCompilationOpContext();
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializePlan(plan, baos, true);
+ serializePlan(plan, baos);
BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
- plan.getClass(), true);
+ plan.getClass());
// Restore the context.
for (Operator<?> newOp : newPlan.getAllOperators()) {
newOp.setCompilationOpContext(ctx);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 01f0967..7474603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -582,7 +582,7 @@ public final class Utilities {
}
private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) {
- Kryo kryo = SerializationUtilities.borrowKryo();
+ Kryo kryo = SerializationUtilities.borrowKryo(conf);
try {
setPlanPath(conf, hiveScratchDir);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 729d2fc..dbbd8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
new file mode 100644
index 0000000..77c0997
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSerializationUtilities {
+
+ @Test
+ public void testEveryPropertiesAreSerialized() throws Exception {
+ MapWork mapWork = doSerDeser(null);
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+ }
+
+ @Test
+ public void testRegexFilterAll() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig(".*"));
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+ }
+
+ @Test
+ public void testRegexFilterSomeProps() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*"));
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+ }
+
+ @Test
+ public void testString() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig("rawDataSize"));
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+ }
+
+ @Test
+ public void testStringAndBadRegex() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk,rawDataSize"));
+
+ // impala_intermediate_stats_chunk props are not filtered in this case because only "*" activates regex mode
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+ }
+
+ @Test
+ public void testStringRegexMixed() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+ }
+
+ @Test
+ public void testSkippingAppliesToAllPartitions() throws Exception {
+ MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "impala_intermediate_stats_chunk1",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "impala_intermediate_stats_chunk2",
+ false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "rawDataSize", false);
+ assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "serialization.ddl", false);
+ }
+
+ private MapWork doSerDeser(Configuration configuration) throws Exception, IOException {
+ MapWork mapWork = mockMapWorkWithSomePartitionDescProperties();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SerializationUtilities.serializePlan(mapWork, baos, configuration);
+ InputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ MapWork mapWorkDeserialized = SerializationUtilities.deserializePlan(bais, MapWork.class);
+ baos.close();
+ bais.close();
+ return mapWorkDeserialized;
+ }
+
+ private Configuration getConfWithSkipConfig(String value) {
+ Configuration configuration = new Configuration();
+ HiveConf.setVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES, value);
+ return configuration;
+ }
+
+ private void assertPartitionDescPropertyPresence(MapWork mapWork, String partitionPath, String prop,
+ boolean isPresent) {
+ String value = mapWork.getPathToPartitionInfo().get(new Path(partitionPath)).getProperties().getProperty(prop);
+ Assert.assertTrue(String.format("'%s' is%ssupposed to be present", prop, (isPresent ? " " : " not ")),
+ isPresent ? value != null : value == null);
+ }
+
+ private static MapWork mockMapWorkWithSomePartitionDescProperties() throws Exception {
+ String tableName = "test_table";
+ int numPartitions = 2;
+ Path root = new Path("/warehouse", "test_table");
+
+ String[] partPath = new String[numPartitions];
+ StringBuilder buffer = new StringBuilder();
+ for (int p = 0; p < numPartitions; ++p) {
+ partPath[p] = new Path(root, "p=" + p).toString();
+ if (p != 0) {
+ buffer.append(',');
+ }
+ buffer.append(partPath[p]);
+ }
+
+ Properties tblProps = new Properties();
+ TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps);
+
+ MapWork mapWork = new MapWork();
+
+ Map<Path, List<String>> aliasMap = new LinkedHashMap<>();
+ List<String> aliases = new ArrayList<String>();
+ aliases.add(tableName);
+
+ LinkedHashMap<Path, PartitionDesc> partMap = new LinkedHashMap<>();
+ for (int p = 0; p < numPartitions; ++p) {
+ Path path = new Path(partPath[p]);
+ aliasMap.put(path, aliases);
+ LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
+ PartitionDesc part = new PartitionDesc(tbl, partSpec);
+ part.setVectorPartitionDesc(
+ VectorPartitionDesc.createVectorizedInputFileFormat("MockInputFileFormatClassName", false, null));
+
+ part.getProperties().put("impala_intermediate_stats_chunk1", "asdfghjk12345678");
+ part.getProperties().put("impala_intermediate_stats_chunk2", "asdfghjk12345678");
+ part.getProperties().put("rawDataSize", "10");
+ part.getProperties().put("serialization.ddl", "asdf");
+
+ partMap.put(path, part);
+ }
+ mapWork.setPathToAliases(aliasMap);
+ mapWork.setPathToPartitionInfo(partMap);
+
+ return mapWork;
+ }
+}