You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/08/31 22:29:55 UTC

impala git commit: IMPALA-7424: Reduce in-memory footprint of incremental stats

Repository: impala
Updated Branches:
  refs/heads/master 13e93e75b -> d4e281b73


IMPALA-7424: Reduce in-memory footprint of incremental stats

Currently incremental stats are stored as chunked Base64 strings in the
HMS parameters map of partition objects. Each of these strings when
stored in the catalogd are Java 'String' objects that use UTF-16 encoding
and take up to 2 bytes per character.

This patch converts the string representation into a deflate-compressed byte
array form when the partition is loaded in the Catalogd and this state is
maintained when transmitting them to the coordinators. To maintain
backward compatibility, the persistent HMS representation of stats has not
been modified. So the incremental stats are still written back to the
chunked Base64 representation while serializing the partition state to
HMS.

On a real world catalogserver dominated by incremental stats memory
footprint, this patch showed ~54% end-to-end heapsize reduction and ~79%
reduction in the memory footprint of incremental stats data structures.

This patch also improves the way the callers check if a partition has
incremental stats by computing this information once and reusing it
later. Without the patch, we deserialize the entire incremental stats
structure everytime this information is needed and that triggers a spike
in usage of working memory on catalogds/Impalads.

Testing: Ran core tests on Catalog V1 Implementation. Ran some manual
queries on Catalog V2 implementation.

Change-Id: I39f02ebfa0c6e9b0baedd0d76058a1b34efb5a02
Reviewed-on: http://gerrit.cloudera.org:8080/11341
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d4e281b7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d4e281b7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d4e281b7

Branch: refs/heads/master
Commit: d4e281b734befdd4b4d289526887d1828581925d
Parents: 13e93e7
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Sun Aug 19 22:52:53 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Aug 31 22:11:22 2018 +0000

----------------------------------------------------------------------
 common/thrift/CatalogObjects.thrift             |   9 +-
 common/thrift/CatalogService.thrift             |  15 +++
 .../impala/analysis/ComputeStatsStmt.java       |   7 +-
 .../apache/impala/catalog/FeCatalogUtils.java   |  12 +-
 .../apache/impala/catalog/FeFsPartition.java    |  21 ++--
 .../apache/impala/catalog/HdfsPartition.java    | 108 +++++++++++++-----
 .../org/apache/impala/catalog/HdfsTable.java    |  13 ++-
 .../impala/catalog/PartitionStatsUtil.java      | 112 +++++++++++--------
 .../catalog/local/CatalogdMetaProvider.java     |  22 +++-
 .../catalog/local/DirectMetaProvider.java       |  11 ++
 .../impala/catalog/local/LocalFsPartition.java  |  32 +++---
 .../impala/catalog/local/LocalFsTable.java      |   5 +-
 .../impala/catalog/local/MetaProvider.java      |   2 +
 .../impala/service/CatalogOpExecutor.java       |  10 +-
 .../org/apache/impala/util/CompressionUtil.java |  71 ++++++++++++
 .../impala/catalog/PartialCatalogInfoTest.java  |   1 -
 .../apache/impala/util/CompressionUtilTest.java |  52 +++++++++
 17 files changed, 376 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index cbd0ba1..31dac70 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -294,8 +294,13 @@ struct THdfsPartition {
   // Total file size in bytes of this partition.
   17: optional i64 total_file_size_bytes
 
-  // True, if this partition has incremental stats
-  18: optional bool has_incremental_stats
+  // byte[] representation of TPartitionStats for this partition that is compressed using
+  // 'deflate-compression'.
+  18: optional binary partition_stats
+
+  // Set to true if partition_stats contain intermediate column stats computed via
+  // incremental statistics, false otherwise.
+  19: optional bool has_incremental_stats
 }
 
 // Constant partition ID used for THdfsPartition.prototype_partition below.

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 98c9b05..48cc748 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -279,6 +279,10 @@ struct TTableInfoSelector {
 
   // List of columns to fetch stats for.
   6: optional list<string> want_stats_for_column_names
+
+  // ... each partition should include the partition stats serialized as a byte[]
+  // and that is deflate-compressed.
+  7: bool want_partition_stats
 }
 
 // Returned information about a particular partition.
@@ -293,6 +297,17 @@ struct TPartialPartitionInfo {
 
   // Set if 'want_partition_files' was set in TTableInfoSelector.
   4: optional list<CatalogObjects.THdfsFileDesc> file_descriptors
+
+  // Deflate-compressed byte[] representation of TPartitionStats for this partition.
+  // Set if 'want_partition_stats' was set in TTableInfoSelector. Not set if the
+  // partition does not have stats.
+  5: optional binary partition_stats
+
+  // Set to true if the partition contains intermediate column stats computed via
+  // incremental statistics. Set when 'want_partition_metadata' is true in
+  // TTableInfoSelector. Incremental stats data can be fetched by setting
+  // 'want_partition_stats' in TTableInfoSelector.
+  6: optional bool has_incremental_stats
 }
 
 // Returned information about a Table, as selected by TTableInfoSelector.

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 3836e4b..b2541cf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -441,11 +441,7 @@ public class ComputeStatsStmt extends StatementBase {
         Collection<? extends FeFsPartition> allPartitions =
             FeCatalogUtils.loadAllPartitions(hdfsTable);
         for (FeFsPartition p: allPartitions) {
-          TPartitionStats partStats = p.getPartitionStats();
           if (!p.hasIncrementalStats() || tableIsMissingColStats) {
-            if (partStats == null) {
-              if (LOG.isTraceEnabled()) LOG.trace(p.toString() + " does not have stats");
-            }
             if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql());
             List<String> partValues = Lists.newArrayList();
             for (LiteralExpr partValue: p.getPartitionValues()) {
@@ -454,8 +450,7 @@ public class ComputeStatsStmt extends StatementBase {
             }
             expectedPartitions_.add(partValues);
           } else {
-            if (LOG.isTraceEnabled()) LOG.trace(p.toString() + " does have statistics");
-            validPartStats_.add(partStats);
+            validPartStats_.add(p.getPartitionStats());
           }
         }
         if (expectedPartitions_.size() == hdfsTable.getPartitions().size()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 2072228..0e76a37 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -308,7 +308,7 @@ public abstract class FeCatalogUtils {
   }
 
   public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
-      ThriftObjectType type, boolean includeIncrementalStats) {
+      ThriftObjectType type, boolean includePartitionStats) {
     HdfsStorageDescriptor sd = part.getInputFormatDescriptor();
     THdfsPartition thriftHdfsPart = new THdfsPartition(
         sd.getLineDelim(),
@@ -325,13 +325,15 @@ public abstract class FeCatalogUtils {
       thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
       thriftHdfsPart.setAccess_level(part.getAccessLevel());
       thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
-      thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
       // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
       // may try to serialize the returned THdfsPartition after releasing the table's lock,
       // and another thread doing DDL may modify the map.
-      thriftHdfsPart.setHms_parameters(Maps.newHashMap(
-          includeIncrementalStats ? part.getParameters() :
-            part.getFilteredHmsParameters()));
+      thriftHdfsPart.setHms_parameters(Maps.newHashMap(part.getParameters()));
+      thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
+      if (includePartitionStats && part.getPartitionStatsCompressed() != null) {
+        thriftHdfsPart.setPartition_stats(
+            Preconditions.checkNotNull(part.getPartitionStatsCompressed()));
+      }
 
       // Add block location information
       long numBlocks = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index ab6a3eb..ea248a1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -116,6 +116,13 @@ public interface FeFsPartition {
   boolean hasIncrementalStats();
 
   /**
+   * @return the byte array representation of TPartitionStats for this partition. They
+   * are stored as a deflate-compressed byte array to reduce memory footprint. Use
+   * 'getPartitionStats()' to get the corresponding TPartitionStats object.
+   */
+  byte[] getPartitionStatsCompressed();
+
+ /**
    * @return the size (in bytes) of all the files inside this partition
    */
   long getSize();
@@ -149,17 +156,9 @@ public interface FeFsPartition {
   LiteralExpr getPartitionValue(int pos);
 
   /**
-   * @return the HMS parameters stored for this partition
+   * @return the HMS parameters stored for this partition. Keys that store chunked
+   * TPartitionStats for this partition are not included. To access partition stats, use
+   * getPartitionStatsCompressed().
    */
   Map<String, String> getParameters();
-
-  /**
-   * Returns the HMS partition parameters after filtering out all the partition
-   * incremental stats information.
-   *
-   * TODO(todd): consider _always_ filtering the parameters to remove incremental
-   * stats, and having a different getter which returns the already-decoded stats
-   * which are more memory-efficient anyway.
-   */
-  Map<String, String> getFilteredHmsParameters();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 26ece19..0a356ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,7 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbCompression;
@@ -517,10 +519,27 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   private boolean isMarkedCached_ = false;
   private final TAccessLevel accessLevel_;
 
-  // (k,v) pairs of parameters for this partition, stored in the HMS. Used by Impala to
-  // store intermediate state for statistics computations.
+  // (k,v) pairs of parameters for this partition, stored in the HMS.
   private Map<String, String> hmsParameters_;
 
+  // Binary representation of the TPartitionStats for this partition. Populated
+  // when the partition is loaded and updated using setPartitionStatsBytes().
+  private byte[] partitionStats_ = null;
+
+  // True if partitionStats_ has intermediate_col_stats populated.
+  private boolean hasIncrementalStats_ = false;
+
+  // A predicate for checking if a given string is a key used for serializing
+  // TPartitionStats to HMS parameters.
+  private static Predicate<String> IS_INCREMENTAL_STATS_KEY =
+      new Predicate<String>() {
+        @Override
+        public boolean apply(String key) {
+          return key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS)
+              || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX);
+        }
+      };
+
   @Override // FeFsPartition
   public HdfsStorageDescriptor getInputFormatDescriptor() {
     return fileFormatDescriptor_;
@@ -623,22 +642,63 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return PartitionStatsUtil.getPartStatsOrWarn(this);
   }
 
-  @Override // FeFsPartition
-  public boolean hasIncrementalStats() {
-    // TODO: performance of this could improve substantially, since
-    // getPartitionStats() performs a bunch of expensive deserialization,
-    // only to end up throwing away the result.
-    TPartitionStats partStats = getPartitionStats();
-    return partStats != null && partStats.intermediate_col_stats != null;
+  public byte[] getPartitionStatsCompressed() {
+    return partitionStats_;
+  }
+
+  public void setPartitionStatsBytes(byte[] partitionStats, boolean hasIncrStats) {
+    if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
+    partitionStats_ = partitionStats;
+    hasIncrementalStats_ = hasIncrStats;
   }
 
+  /**
+   * Helper method that removes the partition stats from hmsParameters_, compresses them
+   * and updates partitionsStats_.
+   */
+  private void extractAndCompressPartStats() {
+    try {
+      // Convert the stats stored in the hmsParams map to a deflate-compressed in-memory
+      // byte array format. After conversion, delete the entries in the hmsParams map
+      // as they are not needed anymore.
+      Reference<Boolean> hasIncrStats = new Reference(false);
+      byte[] partitionStats =
+          PartitionStatsUtil.partStatsBytesFromParameters(hmsParameters_, hasIncrStats);
+      setPartitionStatsBytes(partitionStats, hasIncrStats.getRef());
+    } catch (ImpalaException e) {
+      LOG.warn(String.format("Failed to set partition stats for table %s partition %s",
+          getTable().getFullName(), getPartitionName()), e);
+    } finally {
+      // Delete the incremental stats entries. Cleared even on error conditions so that
+      // we do not persist the corrupt entries in the hmsParameters_ map when it is
+      // flushed to the HMS.
+      Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).clear();
+    }
+  }
+
+  public void dropPartitionStats() { setPartitionStatsBytes(null, false); }
+
+  @Override // FeFsPartition
+  public boolean hasIncrementalStats() { return hasIncrementalStats_; }
+
   @Override // FeFsPartition
   public TAccessLevel getAccessLevel() { return accessLevel_; }
 
   @Override // FeFsPartition
-  public Map<String, String> getParameters() { return hmsParameters_; }
+  public Map<String, String> getParameters() {
+    // Once the TPartitionStats in the parameters map are converted to a compressed
+    // format, the hmsParameters_ map should not contain any partition stats keys.
+    // Even though filterKeys() is O(n), we are not worried about the performance here
+    // since hmsParameters_ should be pretty small once the partition stats are removed.
+    Preconditions.checkState(
+        Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).isEmpty());
+    return hmsParameters_;
+  }
 
-  public void putToParameters(String k, String v) { hmsParameters_.put(k, v); }
+  public void putToParameters(String k, String v) {
+    Preconditions.checkArgument(!IS_INCREMENTAL_STATS_KEY.apply(k));
+    hmsParameters_.put(k, v);
+  }
   public void putToParameters(Pair<String, String> kv) {
     putToParameters(kv.first, kv.second);
   }
@@ -754,12 +814,16 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
             cachedMsPartitionDescriptor_.sdBucketCols,
             cachedMsPartitionDescriptor_.sdSortCols,
             cachedMsPartitionDescriptor_.sdParameters);
+    // Make a copy so that the callers do not need to delete the incremental stats
+    // strings from the hmsParams_ map later.
+    Map<String, String> hmsParams = Maps.newHashMap(getParameters());
+    PartitionStatsUtil.partStatsToParams(this, hmsParams);
     org.apache.hadoop.hive.metastore.api.Partition partition =
         new org.apache.hadoop.hive.metastore.api.Partition(
             getPartitionValuesAsStrings(true), getTable().getDb().getName(),
             getTable().getName(), cachedMsPartitionDescriptor_.msCreateTime,
             cachedMsPartitionDescriptor_.msLastAccessTime, storageDescriptor,
-            getParameters());
+            hmsParams);
     return partition;
   }
 
@@ -788,6 +852,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     } else {
       hmsParameters_ = Maps.newHashMap();
     }
+    extractAndCompressPartStats();
   }
 
   public HdfsPartition(HdfsTable table,
@@ -830,20 +895,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       .toString();
   }
 
-  public static Predicate<String> IS_NOT_INCREMENTAL_STATS_KEY =
-      new Predicate<String>() {
-        @Override
-        public boolean apply(String key) {
-          return !(key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS)
-              || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX));
-        }
-      };
-
-  @Override
-  public Map<String, String> getFilteredHmsParameters() {
-    return Maps.filterKeys(hmsParameters_, IS_NOT_INCREMENTAL_STATS_KEY);
-  }
-
   public static HdfsPartition fromThrift(HdfsTable table,
       long id, THdfsPartition thriftPartition) {
     HdfsStorageDescriptor storageDesc = HdfsStorageDescriptor.fromThriftPartition(
@@ -901,6 +952,11 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       partition.hmsParameters_ = Maps.newHashMap();
     }
 
+    partition.hasIncrementalStats_ = thriftPartition.has_incremental_stats;
+    if (thriftPartition.isSetPartition_stats()) {
+      partition.partitionStats_ = thriftPartition.getPartition_stats();
+    }
+
     return partition;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index d4423d9..282becb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1692,7 +1692,8 @@ public class HdfsTable extends Table implements FeFsTable {
 
     boolean wantPartitionInfo = req.table_info_selector.want_partition_files ||
         req.table_info_selector.want_partition_metadata ||
-        req.table_info_selector.want_partition_names;
+        req.table_info_selector.want_partition_names ||
+        req.table_info_selector.want_partition_stats;
 
     Collection<Long> partIds = req.table_info_selector.partition_ids;
     if (partIds == null && wantPartitionInfo) {
@@ -1715,6 +1716,7 @@ public class HdfsTable extends Table implements FeFsTable {
 
         if (req.table_info_selector.want_partition_metadata) {
           partInfo.hms_partition = part.toHmsPartition();
+          partInfo.setHas_incremental_stats(part.hasIncrementalStats());
         }
 
         if (req.table_info_selector.want_partition_files) {
@@ -1725,6 +1727,10 @@ public class HdfsTable extends Table implements FeFsTable {
           }
         }
 
+        if (req.table_info_selector.want_partition_stats) {
+          partInfo.setPartition_stats(part.getPartitionStatsCompressed());
+        }
+
         resp.table_info.partitions.add(partInfo);
       }
     }
@@ -1762,6 +1768,8 @@ public class HdfsTable extends Table implements FeFsTable {
     int numPartitions =
         (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
     memUsageEstimate += numPartitions * PER_PARTITION_MEM_USAGE_BYTES;
+    // TODO(bharath): Revisit the constant STATS_SIZE_PER_COLUMN_BYTES after the
+    // new incremental stats in-memory representation changes.
     long statsSizeEstimate =
         numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
     boolean includeIncrementalStats =
@@ -1773,8 +1781,7 @@ public class HdfsTable extends Table implements FeFsTable {
       if (refPartitions == null || refPartitions.contains(id)) {
         THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift(
             partition, type, includeIncrementalStats);
-        if (tHdfsPartition.isSetHas_incremental_stats() &&
-            tHdfsPartition.isHas_incremental_stats()) {
+        if (partition.hasIncrementalStats()) {
           memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
           hasIncrementalStats_ = true;
         }

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
index c77ac91..01ef2d0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
@@ -17,10 +17,12 @@
 
 package org.apache.impala.catalog;
 
+import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.MetaStoreUtil;
 
 import java.util.Iterator;
@@ -56,23 +58,31 @@ public class PartitionStatsUtil {
    */
   public static TPartitionStats getPartStatsOrWarn(FeFsPartition part) {
     try {
-      return partStatsFromParameters(part.getParameters());
+      byte[] compressedStats =  part.getPartitionStatsCompressed();
+      if (compressedStats == null) return null;
+      TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
+      TPartitionStats ret = new TPartitionStats();
+      byte[] decompressed = CompressionUtil.deflateDecompress(compressedStats);
+      if (decompressed == null)  {
+        LOG.warn("Error decompressing partition stats for " + part.getPartitionName());
+        return null;
+      }
+      JniUtil.deserializeThrift(protocolFactory, ret, decompressed);
+      return ret;
     } catch (ImpalaException e) {
-      LOG.warn("Could not deserialise incremental stats state for " +
-          part.getPartitionName() +
-          ", consider DROP INCREMENTAL STATS ... PARTITION ... and recomputing " +
-          "incremental stats for this table.");
+      LOG.warn("Bad partition stats for " + part.getPartitionName(), e);
       return null;
     }
   }
 
   /**
-   * Reconstructs a TPartitionStats object from its serialised form in the given parameter
-   * map. Returns null if no stats are serialised, and throws an exception if there was an
-   * error during deserialisation.
+   * Reconstructs the intermediate stats from chunks and returns the corresponding
+   * byte array. The output byte array is deflate-compressed. Sets hasIncrStats to
+   * 'true' if the partition stats contain intermediate col stats.
    */
-  private static TPartitionStats partStatsFromParameters(
-      Map<String, String> hmsParameters) throws ImpalaException {
+  public static byte[] partStatsBytesFromParameters(
+      Map<String, String> hmsParameters, Reference<Boolean> hasIncrStats) throws
+      ImpalaException {
     if (hmsParameters == null) return null;
     String numChunksStr = hmsParameters.get(INCREMENTAL_STATS_NUM_CHUNKS);
     if (numChunksStr == null) return null;
@@ -88,25 +98,50 @@ public class PartitionStatsUtil {
       }
       encodedStats.append(chunk);
     }
-
-    byte[] decodedStats = Base64.decodeBase64(encodedStats.toString());
-    TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
-    TPartitionStats ret = new TPartitionStats();
-    JniUtil.deserializeThrift(protocolFactory, ret, decodedStats);
-    return ret;
+    byte[] decodedBytes = Base64.decodeBase64(encodedStats.toString());
+    TPartitionStats stats = new TPartitionStats();
+    JniUtil.deserializeThrift(new TCompactProtocol.Factory(), stats, decodedBytes);
+    hasIncrStats.setRef(stats.isSetIntermediate_col_stats());
+    return CompressionUtil.deflateCompress(decodedBytes);
   }
 
   /**
-   * Serialises a TPartitionStats object to a partition.
+   * Serialises a TPartitionStats object to a partition. If 'partStats' is null, the
+   * partition's stats are removed.
    */
-  public static void partStatsToParameters(TPartitionStats partStats,
-      HdfsPartition partition) {
-    // null stats means logically delete the stats from this partition
+  public static void partStatsToPartition(TPartitionStats partStats,
+      HdfsPartition partition) throws ImpalaException {
     if (partStats == null) {
-      deletePartStats(partition);
+      partition.setPartitionStatsBytes(null, false);
       return;
     }
 
+    try {
+      TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
+      TSerializer serializer = new TSerializer(protocolFactory);
+      byte[] serialized =
+          CompressionUtil.deflateCompress(serializer.serialize(partStats));
+      partition.setPartitionStatsBytes(
+          serialized, partStats.isSetIntermediate_col_stats());
+    } catch (TException e) {
+      String debugString =
+          String.format("Error saving partition stats: table %s, partition %s",
+          partition.getTable().getFullName(), partition.getPartitionName());
+      LOG.error(debugString, e);
+      throw new ImpalaRuntimeException(debugString, e);
+    }
+  }
+
+  /**
+   * Converts byte[] representation of partition's stats into a chunked string form
+   * appropriate to store in the HMS parameters map. Inserts these chunks into the
+   * given input 'params' map. If we run into any errors deserializing partition stats,
+   * 'params' map is not altered.
+   */
+  public static void partStatsToParams(
+      HdfsPartition partition, Map<String, String> params) {
+    byte[] compressedStats = partition.getPartitionStatsCompressed();
+    if (compressedStats == null) return;
     // The HMS has a 4k (as of Hive 0.13, Impala 2.0) limit on the length of any parameter
     // string.  The serialised version of the partition stats is often larger than this.
     // Therefore, we naively 'chunk' the byte string into 4k pieces, and store the number
@@ -116,31 +151,18 @@ public class PartitionStatsUtil {
     // valid string. This inflates its length somewhat; we may want to consider a
     // different scheme or at least understand why this scheme doesn't seem much more
     // effective than an ASCII representation.
-    try {
-      TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
-      TSerializer serializer = new TSerializer(protocolFactory);
-      byte[] serialized = serializer.serialize(partStats);
-      String base64 = new String(Base64.encodeBase64(serialized));
-      List<String> chunks =
-          chunkStringForHms(base64, MetaStoreUtil.MAX_PROPERTY_VALUE_LENGTH);
-      partition.putToParameters(
-          INCREMENTAL_STATS_NUM_CHUNKS, Integer.toString(chunks.size()));
-      for (int i = 0; i < chunks.size(); ++i) {
-        partition.putToParameters(INCREMENTAL_STATS_CHUNK_PREFIX + i, chunks.get(i));
-      }
-    } catch (TException e) {
-      LOG.error("Error saving partition stats: ", e);
-      // TODO: What to throw here?
+    byte[] decompressed = CompressionUtil.deflateDecompress(compressedStats);
+    if (decompressed  == null)  {
+      LOG.error(
+          "Error decompressing partition stats for " + partition.getPartitionName());
+      return;
     }
-  }
-
-  public static void deletePartStats(HdfsPartition partition) {
-    partition.putToParameters(INCREMENTAL_STATS_NUM_CHUNKS, "0");
-    for (Iterator<String> it = partition.getParameters().keySet().iterator();
-         it.hasNext(); ) {
-      if (it.next().startsWith(INCREMENTAL_STATS_CHUNK_PREFIX)) {
-        it.remove();
-      }
+    String base64 = new String(Base64.encodeBase64(decompressed));
+    List<String> chunks =
+      chunkStringForHms(base64, MetaStoreUtil.MAX_PROPERTY_VALUE_LENGTH);
+    params.put(INCREMENTAL_STATS_NUM_CHUNKS, Integer.toString(chunks.size()));
+    for (int i = 0; i < chunks.size(); ++i) {
+      params.put(INCREMENTAL_STATS_CHUNK_PREFIX + i, chunks.get(i));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 5c1d820..17068e1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -421,6 +421,8 @@ public class CatalogdMetaProvider implements MetaProvider {
     req.table_info_selector.partition_ids = ids;
     req.table_info_selector.want_partition_metadata = true;
     req.table_info_selector.want_partition_files = true;
+    // TODO(todd): fetch incremental stats on-demand for compute-incremental-stats.
+    req.table_info_selector.want_partition_stats = true;
     TGetPartialCatalogObjectResponse resp = sendRequest(req);
     checkResponse(resp.table_info != null && resp.table_info.partitions != null,
         req, "missing partition list result");
@@ -456,7 +458,8 @@ public class CatalogdMetaProvider implements MetaProvider {
         fds.add(fd.cloneWithNewHostIndex(resp.table_info.network_addresses, hostIndex));
       }
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
-          ImmutableList.copyOf(fds));
+          ImmutableList.copyOf(fds), part.getPartition_stats(),
+          part.has_incremental_stats);
 
       checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
 
@@ -575,11 +578,15 @@ public class CatalogdMetaProvider implements MetaProvider {
   public static class PartitionMetadataImpl implements PartitionMetadata {
     private final Partition msPartition_;
     private final ImmutableList<FileDescriptor> fds_;
+    private final byte[] partitionStats_;
+    private final boolean hasIncrementalStats_;
 
-    public PartitionMetadataImpl(Partition msPartition,
-        ImmutableList<FileDescriptor> fds) {
+    public PartitionMetadataImpl(Partition msPartition, ImmutableList<FileDescriptor> fds,
+        byte[] partitionStats, boolean hasIncrementalStats) {
       this.msPartition_ = Preconditions.checkNotNull(msPartition);
       this.fds_ = fds;
+      this.partitionStats_ = partitionStats;
+      this.hasIncrementalStats_ = hasIncrementalStats;
     }
 
     /**
@@ -593,7 +600,8 @@ public class CatalogdMetaProvider implements MetaProvider {
       for (FileDescriptor fd: fds_) {
         fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
       }
-      return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds));
+      return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds),
+          partitionStats_, hasIncrementalStats_);
     }
 
     @Override
@@ -605,6 +613,12 @@ public class CatalogdMetaProvider implements MetaProvider {
     public ImmutableList<FileDescriptor> getFileDescriptors() {
       return fds_;
     }
+
+    @Override
+    public byte[] getPartitionStats() { return partitionStats_; }
+
+    @Override
+    public boolean hasIncrementalStats() { return hasIncrementalStats_; }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index c1e3675..8bba217 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -342,6 +342,17 @@ class DirectMetaProvider implements MetaProvider {
     public ImmutableList<FileDescriptor> getFileDescriptors() {
       return fds_;
     }
+
+    @Override
+    public boolean hasIncrementalStats() {
+      return false; /* Incremental stats not supported in direct mode */
+    }
+
+    @Override
+    public byte[] getPartitionStats() {
+      throw new UnsupportedOperationException("Incremental stats not supported with " +
+          "DirectMetaProvider implementation.");
+    }
   }
 
   private class TableMetaRefImpl implements TableMetaRef {

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 9cf2d8b..c594873 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -47,18 +47,28 @@ public class LocalFsPartition implements FeFsPartition {
   private final LocalFsTable table_;
   private final LocalPartitionSpec spec_;
   private final Partition msPartition_;
+
   /**
    * Null in the case of a 'prototype partition'.
    */
   @Nullable
   private final ImmutableList<FileDescriptor> fileDescriptors_;
 
+  @Nullable
+  private final byte[] partitionStats_;
+
+  // True if partitionStats_ has intermediate_col_stats populated.
+  private final boolean hasIncrementalStats_;
+
   public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec,
-      Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors) {
+      Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors,
+      byte [] partitionStats, boolean hasIncrementalStats) {
     table_ = Preconditions.checkNotNull(table);
     spec_ = Preconditions.checkNotNull(spec);
     msPartition_ = Preconditions.checkNotNull(msPartition);
     fileDescriptors_ = fileDescriptors;
+    partitionStats_ = partitionStats;
+    hasIncrementalStats_ = hasIncrementalStats;
   }
 
   @Override
@@ -158,13 +168,10 @@ public class LocalFsPartition implements FeFsPartition {
   }
 
   @Override
-  public boolean hasIncrementalStats() {
-    // TODO(todd): copy-paste from HdfsPartition
-    // TODO(todd): as in the equivalent method in HdfsPartition, this is
-    // expensive because it deserializes the stats completely.
-    TPartitionStats partStats = getPartitionStats();
-    return partStats != null && partStats.intermediate_col_stats != null;
-  }
+  public boolean hasIncrementalStats() { return hasIncrementalStats_; }
+
+  @Override // FeFsPartition
+  public byte[] getPartitionStatsCompressed() { return partitionStats_; }
 
   @Override
   public long getSize() {
@@ -204,13 +211,4 @@ public class LocalFsPartition implements FeFsPartition {
   public Map<String, String> getParameters() {
     return msPartition_.getParameters();
   }
-
-  @Override
-  public Map<String, String> getFilteredHmsParameters() {
-    // TODO(todd): for now, copied from HdfsPartition. Eventually we would want to
-    // lazy-fetch these parameters separately for only the cases that require them,
-    // since they are quite large.
-    return Maps.filterKeys(getParameters(),
-        HdfsPartition.IS_NOT_INCREMENTAL_STATS_KEY);
-  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 10021e0..3ace234 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -336,7 +336,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     LocalPartitionSpec spec = new LocalPartitionSpec(
         this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
     LocalFsPartition prototypePartition = new LocalFsPartition(
-        this, spec, protoMsPartition, /*fileDescriptors=*/null);
+        this, spec, protoMsPartition, /*fileDescriptors=*/null, /*partitionStats=*/null,
+        /*hasIncrementalStats=*/false);
     return prototypePartition;
   }
 
@@ -411,7 +412,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
       }
 
       LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
-          p.getFileDescriptors());
+          p.getFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats());
       ret.add(part);
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 72e8f03..c5497bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -119,5 +119,7 @@ interface MetaProvider {
   interface PartitionMetadata {
     Partition getHmsPartition();
     ImmutableList<FileDescriptor> getFileDescriptors();
+    byte[] getPartitionStats();
+    boolean hasIncrementalStats();
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ad3add6..060469e 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -856,7 +856,7 @@ public class CatalogOpExecutor {
         LOG.trace(String.format("Updating stats for partition %s: numRows=%s",
             partition.getValuesAsString(), numRows));
       }
-      PartitionStatsUtil.partStatsToParameters(partitionStats, partition);
+      PartitionStatsUtil.partStatsToPartition(partitionStats, partition);
       partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
       // HMS requires this param for stats changes to take effect.
       partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
@@ -1146,8 +1146,8 @@ public class CatalogOpExecutor {
         }
 
         for(HdfsPartition partition : partitions) {
-          if (partition.getPartitionStats() != null) {
-            PartitionStatsUtil.deletePartStats(partition);
+          if (partition.getPartitionStatsCompressed() != null) {
+            partition.dropPartitionStats();
             try {
               applyAlterPartition(table, partition);
             } finally {
@@ -1233,8 +1233,8 @@ public class CatalogOpExecutor {
       // TODO(todd): avoid downcast
       HdfsPartition part = (HdfsPartition) fePart;
       boolean isModified = false;
-      if (part.getPartitionStats() != null) {
-        PartitionStatsUtil.deletePartStats(part);
+      if (part.getPartitionStatsCompressed() != null) {
+        part.dropPartitionStats();
         isModified = true;
       }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/main/java/org/apache/impala/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/CompressionUtil.java b/fe/src/main/java/org/apache/impala/util/CompressionUtil.java
new file mode 100644
index 0000000..86d8477
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/CompressionUtil.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CompressionUtil {
+
+  private final static Logger LOG = LoggerFactory.getLogger(CompressionUtil.class);
+
+  /**
+   * Compresses a given input byte array using DeflaterOutputStream. Returns null if
+   * the input is null or if there is an error compressing the input.
+   */
+  public static byte[] deflateCompress(byte[] input) {
+    if (input == null) return null;
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
+    // TODO: Benchmark other compression levels.
+    DeflaterOutputStream stream =
+        new DeflaterOutputStream(bos, new Deflater(Deflater.BEST_COMPRESSION));
+    try {
+      stream.write(input);
+      stream.close();
+    } catch (IOException e) {
+      LOG.error("Error compressing input bytes.", e);
+      return null;
+    }
+    return bos.toByteArray();
+  }
+
+  /**
+   * Decompresses a deflate-compressed byte array and returns the output bytes. Returns
+   * null if the input is null or if there is an error decompressing the input.
+   */
+  public static byte[] deflateDecompress(byte[] input) {
+    if (input == null) return null;
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try{
+      IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(input)), out);
+    } catch(IOException e){
+      LOG.error("Error decompressing input bytes.", e);
+      return null;
+    }
+    return out.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 2ff5015..6dba475 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -123,7 +123,6 @@ public class PartialCatalogInfoTest {
     // TODO(todd): we should probably transfer a compressed descriptor instead
     // and refactor the MetaProvider interface to expose those since there is
     // a lot of redundant info in partition descriptors.
-    // TODO(todd): should also filter out the incremental stats.
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/d4e281b7/fe/src/test/java/org/apache/impala/util/CompressionUtilTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/CompressionUtilTest.java b/fe/src/test/java/org/apache/impala/util/CompressionUtilTest.java
new file mode 100644
index 0000000..ff08a08
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/CompressionUtilTest.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Strings;
+import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TCacheJarParams;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompressionUtilTest {
+  private static TBinaryProtocol.Factory protocolFactory_ = new TBinaryProtocol.Factory();
+
+  @Test
+  public void testCompressionUtil() throws Exception {
+    // Compress and decompress simple strings.
+    String[] stringsToTest ={"", "TestString", Strings.repeat("x", 100 * 1024 * 1024)};
+    for (String test: stringsToTest) {
+      byte[] compressed = CompressionUtil.deflateCompress(test.getBytes());
+      byte[] decompressed = CompressionUtil.deflateDecompress(compressed);
+      Assert.assertEquals(new String(decompressed), test);
+    }
+
+    // Compress and decompress a thrift struct.
+    TCacheJarParams testObject = new TCacheJarParams("test string");
+    byte[] testObjBytes = JniUtil.serializeToThrift(testObject, protocolFactory_);
+
+    byte[] compressed = CompressionUtil.deflateCompress(testObjBytes);
+    byte[] decompressed = CompressionUtil.deflateDecompress(compressed);
+
+    TCacheJarParams deserializedTestObj = new TCacheJarParams();
+    JniUtil.deserializeThrift(protocolFactory_, deserializedTestObj, decompressed);
+    Assert.assertEquals(deserializedTestObj.hdfs_location, "test string");
+  }
+
+}