You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/09/14 09:07:23 UTC

[hive] branch master updated: HIVE-25501: Provide a filter for removing useless properties from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e3552  HIVE-25501: Provide a filter for removing useless properties from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)
a4e3552 is described below

commit a4e3552e9eff53dd44d3aaae4f91ad2fb89cec6e
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Tue Sep 14 11:07:11 2021 +0200

    HIVE-25501: Provide a filter for removing useless properties from PartitionDesc objects before MapWork serialization (#2620) (Laszlo Bodor reviewed by Rajesh Balamohan)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   7 +
 .../hive/ql/exec/SerializationUtilities.java       | 148 +++++++++++----
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |   2 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java    |   1 -
 .../hive/ql/exec/TestSerializationUtilities.java   | 199 +++++++++++++++++++++
 5 files changed, 318 insertions(+), 39 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb5b981..f61b903 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4524,6 +4524,13 @@ public class HiveConf extends Configuration {
 
     HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false,
         "Whether to send the query plan via local resource or RPC"),
+    HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES("hive.plan.mapwork.serialization.skip.properties", "",
+        "Comma separated list of properties which is not needed in execution time, so can be removed "
+            + "from PartitionDesc properties before serialization, config can contain exact strings and regex "
+            + "expressions, the regex mode is activated if at least 1 asterisk (*) is present in the current word: "
+            + "rawDataSize                exact string match, removes only rawDataSize property"
+            + ".*Size                     regex match, removes every property ending with 'Size'"
+            + "numRows,impala_.*chunk.*   comma separated and mixed (handles strings and regexes at the same time)"),
     HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true,
         "Whether to generate the splits locally or in the AM (tez only)"),
     HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index d4e8407..56e1b32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -31,15 +31,22 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.common.type.TimestampTZ;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
@@ -60,6 +67,7 @@ import com.esotericsoftware.kryo.Registration;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.util.Pool;
+import com.google.common.annotations.VisibleForTesting;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 
 /**
@@ -111,8 +119,10 @@ public class SerializationUtilities {
   /**
    * Provides general-purpose hooks for specific types, as well as a global hook.
    */
-  private static class KryoWithHooks extends Kryo {
+  private static class KryoWithHooks extends Kryo implements Configurable {
     private Hook globalHook;
+    // this should be set on-the-fly after borrowing this instance and needs to be reset on release
+    private Configuration configuration;
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private static final class SerializerWithHook extends com.esotericsoftware.kryo.Serializer {
@@ -206,6 +216,17 @@ public class SerializationUtilities {
       T result = super.readObject(input, type, serializer);
       return ponderGlobalPostReadHook(hook, result);
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.configuration = conf;
+
+    }
+
+    @Override
+    public Configuration getConf() {
+      return configuration;
+    }
   }
 
   private static final Object FAKE_REFERENCE = new Object();
@@ -234,6 +255,7 @@ public class SerializationUtilities {
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
     kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new ArrayListSubListSerializer());
     kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer());
+    kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class));
     kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, PartitionDesc.class));
 
     ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
@@ -254,8 +276,13 @@ public class SerializationUtilities {
    * @return kryo instance
    */
   public static Kryo borrowKryo() {
+    return borrowKryo(null);
+  }
+
+  public static Kryo borrowKryo(Configuration configuration) {
     Kryo kryo = kryoPool.obtain();
     kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    ((KryoWithHooks) kryo).setConf(configuration);
     return kryo;
   }
 
@@ -265,6 +292,9 @@ public class SerializationUtilities {
    * @param kryo - kryo instance to be released
    */
   public static void releaseKryo(Kryo kryo) {
+    if (kryo != null){
+      ((KryoWithHooks) kryo).setConf(null);
+    }
     kryoPool.free(kryo);
   }
 
@@ -542,6 +572,70 @@ public class SerializationUtilities {
   }
 
   /**
+   * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link MapWork} objects e.g. in
+   * order to remove useless properties in execution time.
+   */
+  private static class MapWorkSerializer extends FieldSerializer<MapWork> {
+
+    public MapWorkSerializer(Kryo kryo, Class type) {
+      super(kryo, type);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, MapWork mapWork) {
+      filterMapworkProperties(kryo, mapWork);
+      super.write(kryo, output, mapWork);
+    }
+
+    private void filterMapworkProperties(Kryo kryo, MapWork mapWork) {
+      Configuration configuration = ((KryoWithHooks) kryo).getConf();
+      if (configuration == null || HiveConf
+          .getVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).isEmpty()) {
+        return;
+      }
+      String[] filterProps =
+          HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES).split(",");
+      for (String prop : filterProps) {
+        boolean isRegex = isRegex(prop);
+        Pattern pattern = Pattern.compile(prop);
+
+        LOG.debug("Trying to filter MapWork properties (regex: " + isRegex + "): " + prop);
+
+        for (Entry<Path, PartitionDesc> partDescEntry : mapWork.getPathToPartitionInfo().entrySet()) {
+          /*
+           * remove by regex, could be a bit more expensive because of iterating and matching regexes
+           * e.g.: in case of impala_intermediate_stats_chunk1, impala_intermediate_stats_chunk2, user only needs to
+           * configure impala_intermediate_stats_chunk.*
+           */
+          if (isRegex) {
+            Iterator<Entry<Object, Object>> itProps =
+                partDescEntry.getValue().getProperties().entrySet().iterator();
+            while (itProps.hasNext()) {
+              Map.Entry<Object, Object> entry = itProps.next();
+              String actualProp = (String) entry.getKey();
+              Matcher matcher = pattern.matcher(actualProp);
+
+              if (matcher.find()) {
+                LOG.debug("Removed '{}' from MapWork (partition: {})", actualProp, partDescEntry.getKey());
+                itProps.remove();
+              }
+            }
+          } else {
+            Object objRemoved = partDescEntry.getValue().getProperties().remove(prop);
+            if (objRemoved != null) {
+              LOG.debug("Removed '{}' from MapWork (partition: {})", prop, partDescEntry.getKey());
+            }
+          }
+        }
+      }
+    }
+
+    private boolean isRegex(String prop) {
+      return prop.contains("*");
+    }
+  }
+
+  /**
    * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link PartitionDesc} objects
    * in order to invoke any string interning code present in the "setter" methods. {@link
    * PartitionDesc} objects are usually stored by {@link MapWork} objects and contain duplicate info
@@ -573,31 +667,24 @@ public class SerializationUtilities {
    * @param out  The stream to write to.
    */
   public static void serializePlan(Object plan, OutputStream out) {
-    serializePlan(plan, out, false);
-  }
-
-  public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
-    serializePlan(kryo, plan, out, false);
+    serializePlan(plan, out, null);
   }
 
-  private static void serializePlan(Object plan, OutputStream out, boolean cloningPlan) {
-    Kryo kryo = borrowKryo();
+  @VisibleForTesting
+  static void serializePlan(Object plan, OutputStream out, Configuration configuration) {
+    Kryo kryo = borrowKryo(configuration);
     try {
-      serializePlan(kryo, plan, out, cloningPlan);
+      serializePlan(kryo, plan, out);
     } finally {
       releaseKryo(kryo);
     }
   }
 
-  private static void serializePlan(Kryo kryo, Object plan, OutputStream out, boolean cloningPlan) {
+  public static void serializePlan(Kryo kryo, Object plan, OutputStream out) {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
     LOG.info("Serializing " + plan.getClass().getSimpleName() + " using kryo");
-    if (cloningPlan) {
-      serializeObjectByKryo(kryo, plan, out);
-    } else {
-      serializeObjectByKryo(kryo, plan, out);
-    }
+    serializeObjectByKryo(kryo, plan, out);
     perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
   }
 
@@ -609,35 +696,22 @@ public class SerializationUtilities {
    * @return The plan, such as QueryPlan, MapredWork, etc.
    */
   public static <T> T deserializePlan(InputStream in, Class<T> planClass) {
-    return deserializePlan(in, planClass, false);
-  }
-
-  public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass) {
-    return deserializePlan(kryo, in, planClass, false);
-  }
-
-  private static <T> T deserializePlan(InputStream in, Class<T> planClass, boolean cloningPlan) {
     Kryo kryo = borrowKryo();
     T result = null;
     try {
-      result = deserializePlan(kryo, in, planClass, cloningPlan);
+      result = deserializePlan(kryo, in, planClass);
     } finally {
       releaseKryo(kryo);
     }
     return result;
   }
 
-  private static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass,
-      boolean cloningPlan) {
+  public static <T> T deserializePlan(Kryo kryo, InputStream in, Class<T> planClass) {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
     T plan;
     LOG.info("Deserializing " + planClass.getSimpleName() + " using kryo");
-    if (cloningPlan) {
-      plan = deserializeObjectByKryo(kryo, in, planClass);
-    } else {
-      plan = deserializeObjectByKryo(kryo, in, planClass);
-    }
+    plan = deserializeObjectByKryo(kryo, in, planClass);
     perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);
     return plan;
   }
@@ -654,9 +728,9 @@ public class SerializationUtilities {
     Operator<?> op = plan.getAnyOperator();
     CompilationOpContext ctx = (op == null) ? null : op.getCompilationOpContext();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    serializePlan(plan, baos, true);
+    serializePlan(plan, baos);
     MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-        MapredWork.class, true);
+        MapredWork.class);
     // Restore the context.
     for (Operator<?> newOp : newPlan.getAllOperators()) {
       newOp.setCompilationOpContext(ctx);
@@ -676,11 +750,11 @@ public class SerializationUtilities {
     }
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
     CompilationOpContext ctx = roots.get(0).getCompilationOpContext();
-    serializePlan(roots, baos, true);
+    serializePlan(roots, baos);
     @SuppressWarnings("unchecked")
     List<Operator<?>> result =
         deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-            roots.getClass(), true);
+            roots.getClass());
     // Restore the context.
     LinkedList<Operator<?>> newOps = new LinkedList<>(result);
     while (!newOps.isEmpty()) {
@@ -705,9 +779,9 @@ public class SerializationUtilities {
     Operator<?> op = plan.getAnyRootOperator();
     CompilationOpContext ctx = (op == null) ? null : op.getCompilationOpContext();
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-    serializePlan(plan, baos, true);
+    serializePlan(plan, baos);
     BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
-        plan.getClass(), true);
+        plan.getClass());
     // Restore the context.
     for (Operator<?> newOp : newPlan.getAllOperators()) {
       newOp.setCompilationOpContext(ctx);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 01f0967..7474603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -582,7 +582,7 @@ public final class Utilities {
   }
 
   private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) {
-    Kryo kryo = SerializationUtilities.borrowKryo();
+    Kryo kryo = SerializationUtilities.borrowKryo(conf);
     try {
       setPlanPath(conf, hiveScratchDir);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 729d2fc..dbbd8c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.IConfigureJobConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
new file mode 100644
index 0000000..77c0997
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestSerializationUtilities.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSerializationUtilities {
+
+  @Test
+  public void testEveryPropertiesAreSerialized() throws Exception {
+    MapWork mapWork = doSerDeser(null);
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+  }
+
+  @Test
+  public void testRegexFilterAll() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig(".*"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+  }
+
+  @Test
+  public void testRegexFilterSomeProps() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+  }
+
+  @Test
+  public void testString() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("rawDataSize"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+  }
+
+  @Test
+  public void testStringAndBadRegex() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk,rawDataSize"));
+
+    // impala_intermediate_stats_chunk props are not filtered in this case because only "*" activates regex mode
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2", true);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", true);
+  }
+
+  @Test
+  public void testStringRegexMixed() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+  }
+
+  @Test
+  public void testSkippingAppliesToAllPartitions() throws Exception {
+    MapWork mapWork = doSerDeser(getConfWithSkipConfig("impala_intermediate_stats_chunk.*,rawDataSize,.*ddl"));
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=0", "serialization.ddl", false);
+
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "impala_intermediate_stats_chunk1",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "impala_intermediate_stats_chunk2",
+        false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "rawDataSize", false);
+    assertPartitionDescPropertyPresence(mapWork, "/warehouse/test_table/p=1", "serialization.ddl", false);
+  }
+
+  private MapWork doSerDeser(Configuration configuration) throws Exception, IOException {
+    MapWork mapWork = mockMapWorkWithSomePartitionDescProperties();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    SerializationUtilities.serializePlan(mapWork, baos, configuration);
+    InputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    MapWork mapWorkDeserialized = SerializationUtilities.deserializePlan(bais, MapWork.class);
+    baos.close();
+    bais.close();
+    return mapWorkDeserialized;
+  }
+
+  private Configuration getConfWithSkipConfig(String value) {
+    Configuration configuration = new Configuration();
+    HiveConf.setVar(configuration, HiveConf.ConfVars.HIVE_PLAN_MAPWORK_SERIALIZATION_SKIP_PROPERTIES, value);
+    return configuration;
+  }
+
+  private void assertPartitionDescPropertyPresence(MapWork mapWork, String partitionPath, String prop,
+      boolean isPresent) {
+    String value = mapWork.getPathToPartitionInfo().get(new Path(partitionPath)).getProperties().getProperty(prop);
+    Assert.assertTrue(String.format("'%s' is%ssupposed to be present", prop, (isPresent ? " " : " not ")),
+        isPresent ? value != null : value == null);
+  }
+
+  private static MapWork mockMapWorkWithSomePartitionDescProperties() throws Exception {
+    String tableName = "test_table";
+    int numPartitions = 2;
+    Path root = new Path("/warehouse", "test_table");
+
+    String[] partPath = new String[numPartitions];
+    StringBuilder buffer = new StringBuilder();
+    for (int p = 0; p < numPartitions; ++p) {
+      partPath[p] = new Path(root, "p=" + p).toString();
+      if (p != 0) {
+        buffer.append(',');
+      }
+      buffer.append(partPath[p]);
+    }
+
+    Properties tblProps = new Properties();
+    TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps);
+
+    MapWork mapWork = new MapWork();
+
+    Map<Path, List<String>> aliasMap = new LinkedHashMap<>();
+    List<String> aliases = new ArrayList<String>();
+    aliases.add(tableName);
+
+    LinkedHashMap<Path, PartitionDesc> partMap = new LinkedHashMap<>();
+    for (int p = 0; p < numPartitions; ++p) {
+      Path path = new Path(partPath[p]);
+      aliasMap.put(path, aliases);
+      LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
+      PartitionDesc part = new PartitionDesc(tbl, partSpec);
+      part.setVectorPartitionDesc(
+          VectorPartitionDesc.createVectorizedInputFileFormat("MockInputFileFormatClassName", false, null));
+
+      part.getProperties().put("impala_intermediate_stats_chunk1", "asdfghjk12345678");
+      part.getProperties().put("impala_intermediate_stats_chunk2", "asdfghjk12345678");
+      part.getProperties().put("rawDataSize", "10");
+      part.getProperties().put("serialization.ddl", "asdf");
+
+      partMap.put(path, part);
+    }
+    mapWork.setPathToAliases(aliasMap);
+    mapWork.setPathToPartitionInfo(partMap);
+
+    return mapWork;
+  }
+}