You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:49 UTC

[04/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce compressed storage

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ed864cc..e52449c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -11121,4 +11121,63 @@
     </test-case>
   </test-group>
   &GeoQueries;
+  <test-group name="compression">
+    <test-case FilePath="compression">
+      <compilation-unit name="incompressible-pages/large-page">
+        <output-dir compare="Text">incompressible-pages/large-page</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="compression">
+      <compilation-unit name="incompressible-pages/small-page">
+        <output-dir compare="Text">incompressible-pages/small-page</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="compression">
+      <compilation-unit name="invalid-compression-scheme">
+        <output-dir compare="Text">invalid-compression-scheme</output-dir>
+        <expected-error>ASX1096: Unknown compression scheme zip. Supported schemes are [snappy,none]</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="compression">
+      <compilation-unit name="scheme-none">
+        <output-dir compare="Text">scheme-none</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="compression">
+      <compilation-unit name="scheme-snappy">
+        <output-dir compare="Text">scheme-snappy</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+  <test-group name="ddl-with-clause">
+    <test-case FilePath="ddl-with-clause">
+      <compilation-unit name="missing-non-optional">
+        <output-dir compare="Text">missing-non-optional</output-dir>
+        <expected-error>ASX1061: Field "merge-policy.name" in the with clause cannot be null or missing</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="ddl-with-clause">
+      <compilation-unit name="type-mismatch">
+        <output-dir compare="Text">type-mismatch</output-dir>
+        <expected-error>ASX1060: Field "merge-policy.parameters.max-mergable-component-size" in the with clause must be of type bigint, but found string</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="ddl-with-clause">
+      <compilation-unit name="unsupported-field">
+        <output-dir compare="Text">unsupported-field</output-dir>
+        <expected-error>ASX1059: Field(s) [unknown-field] unsupported in the with clause</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="ddl-with-clause">
+      <compilation-unit name="unsupported-subfield">
+        <output-dir compare="Text">unsupported-subfield</output-dir>
+        <expected-error>ASX1097: Subfield(s) [unknown-subfield] in "merge-policy" unsupported in the with clause</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 85e44ea..6bd73e7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -22,6 +22,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
 
@@ -47,7 +48,8 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
         STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES(POSITIVE_INTEGER, 8),
         STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
-        STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8);
+        STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8),
+        STORAGE_COMPRESSION_BLOCK(STRING, "none");
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -88,6 +90,8 @@ public class StorageProperties extends AbstractProperties {
                     return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
                 case STORAGE_MAX_ACTIVE_WRITABLE_DATASETS:
                     return "The maximum number of datasets that can be concurrently modified";
+                case STORAGE_COMPRESSION_BLOCK:
+                    return "The default compression scheme for the storage";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -179,6 +183,10 @@ public class StorageProperties extends AbstractProperties {
         return accessor.getInt(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS);
     }
 
+    public String getCompressionScheme() {
+        return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK);
+    }
+
     protected int getMetadataDatasets() {
         return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 18e3327..81ae3e1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,14 +21,15 @@ package org.apache.asterix.common.dataflow;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
+import org.apache.asterix.common.storage.ICompressionManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.storage.common.IStorageManager;
@@ -127,4 +128,9 @@ public interface ICcApplicationContext extends IApplicationContext {
      * @return the transaction id factory
      */
     ITxnIdFactory getTxnIdFactory();
+
+    /**
+     * @return the compression manager
+     */
+    ICompressionManager getCompressionManager();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0bf446e..54fd65c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -147,7 +147,7 @@ public class ErrorCode {
     public static final int UNSUPPORTED_WITH_FIELD = 1059;
     public static final int WITH_FIELD_MUST_BE_OF_TYPE = 1060;
     public static final int WITH_FIELD_MUST_CONTAIN_SUB_FIELD = 1061;
-    public static final int MERGE_POLICY_PARAMETER_INVALID_TYPE = 1062;
+    public static final int CONFIGURATION_PARAMETER_INVALID_TYPE = 1062;
     public static final int UNKNOWN_DATAVERSE = 1063;
     public static final int ERROR_OCCURRED_BETWEEN_TWO_TYPES_CONVERSION = 1064;
     public static final int CHOSEN_INDEX_COUNT_SHOULD_BE_GREATER_THAN_ONE = 1065;
@@ -181,6 +181,8 @@ public class ErrorCode {
     public static final int COMPILATION_TRANSLATION_ERROR = 1093;
     public static final int RANGE_MAP_ERROR = 1094;
     public static final int COMPILATION_EXPECTED_FUNCTION_CALL = 1095;
+    public static final int UNKNOWN_COMPRESSION_SCHEME = 1096;
+    public static final int UNSUPPORTED_WITH_SUBFIELD = 1097;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
new file mode 100644
index 0000000..b9a0baf
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+
+/**
+ * An interface for the compression manager which handles all the registered
+ * schemes and validates the provided configurations.
+ */
+public interface ICompressionManager {
+
+    /**
+     * Get a registered CompressorDecompressorFactory
+     *
+     * @param schemeName
+     *            Compression scheme name
+     * @return Compressor/Decompressor factory if the scheme is specified or NOOP o.w
+     * @throws CompilationException
+     */
+    ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException;
+
+    /**
+     * Get the specified compression scheme in the DDL or the default one
+     *
+     * @param ddlScheme
+     *            Compression scheme name from DDL
+     * @return DDL or default compression scheme name
+     * @throws CompilationException
+     */
+    String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 8c17ec6..7aa9b91 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -132,10 +132,10 @@
 1056 = Too many options were specified for %1$s
 1057 = Expression of type %1$s is not supported in constant record
 1058 = Literal of type %1$s is not supported in constant record
-1059 = Field \"%1$s\" unsupported in the with clause
-1060 = Field \"%1$s\" in the with clause must be of type %2$s
-1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\"
-1062 = Merge policy parameters cannot be of type %1$s
+1059 = Field(s) %1$s unsupported in the with clause
+1060 = Field \"%1$s\" in the with clause must be of type %2$s, but found %3$s
+1061 = Field \"%1$s\" in the with clause cannot be null or missing
+1062 = Configuration parameter cannot be of type %1$s
 1063 = Cannot find dataverse with name %1$s
 1064 = An error was occurred while converting type %1$s to type %2$s.
 1065 = There should be at least two applicable indexes.
@@ -168,6 +168,8 @@
 1093 = A parser error has occurred. The detail exception: %1$s
 1094 = Cannot parse range map: %1$s
 1095 = Expected function call
+1096 = Unknown compression scheme %1$s. Supported schemes are %2$s
+1097 = Subfield(s) %1$s in \"%2$s\" unsupported in the with clause
 
 # Feed Errors
 3001 = Illegal state.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/pom.xml b/asterixdb/asterix-lang-common/pom.xml
index dde41e0..bb153a5 100644
--- a/asterixdb/asterix-lang-common/pom.xml
+++ b/asterixdb/asterix-lang-common/pom.xml
@@ -107,5 +107,9 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-data-std</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index 3d5b815..9f01b1c 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -18,19 +18,19 @@
  */
 package org.apache.asterix.lang.common.statement;
 
+import java.util.Map;
+
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.base.AbstractStatement;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.ConfigurationUtil;
 import org.apache.asterix.lang.common.util.ExpressionUtils;
-import org.apache.asterix.lang.common.util.MergePolicyUtils;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.object.base.AdmObjectNode;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
-import java.util.Map;
-
 /**
  * The new create feed statement only concerns the feed adaptor configuration.
  * All feeds are considered as primary feeds.
@@ -76,7 +76,7 @@ public class CreateFeedStatement extends AbstractStatement {
     }
 
     public Map<String, String> getConfiguration() throws CompilationException {
-        return MergePolicyUtils.toProperties(withObjectNode);
+        return ConfigurationUtil.toProperties(withObjectNode);
     }
 
     public AdmObjectNode getWithObjectNode() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 4aeb6d3..0a17b24 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -18,31 +18,22 @@
  */
 package org.apache.asterix.lang.common.statement;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.AbstractStatement;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.util.ExpressionUtils;
-import org.apache.asterix.lang.common.util.MergePolicyUtils;
+import org.apache.asterix.lang.common.util.ConfigurationUtil;
+import org.apache.asterix.lang.common.util.DatasetDeclParametersUtil;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.object.base.AdmObjectNode;
-import org.apache.asterix.object.base.AdmStringNode;
 import org.apache.asterix.object.base.IAdmNode;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.asterix.runtime.compression.CompressionManager;
 
 public class DatasetDecl extends AbstractStatement {
-    protected static final String[] WITH_OBJECT_FIELDS = new String[] { MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME };
-    protected static final Set<String> WITH_OBJECT_FIELDS_SET = new HashSet<>(Arrays.asList(WITH_OBJECT_FIELDS));
-
     protected final Identifier name;
     protected final Identifier dataverse;
     protected final Identifier itemTypeDataverse;
@@ -76,14 +67,7 @@ public class DatasetDecl extends AbstractStatement {
         }
         this.nodegroupName = nodeGroupName;
         this.hints = hints;
-        try {
-            this.withObjectNode = withRecord == null ? null : ExpressionUtils.toNode(withRecord);
-        } catch (CompilationException e) {
-            throw e;
-        } catch (AlgebricksException e) {
-            // TODO(tillw) make signatures throw Algebricks exceptions
-            throw new CompilationException(e);
-        }
+        this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord);
         this.ifNotExists = ifNotExists;
         this.datasetType = datasetType;
         this.datasetDetailsDecl = idd;
@@ -141,50 +125,17 @@ public class DatasetDecl extends AbstractStatement {
         return nodegroupName;
     }
 
-    public String getCompactionPolicy() throws CompilationException {
-        AdmObjectNode mergePolicy = getMergePolicyObject();
-        if (mergePolicy == null) {
-            return null;
-        }
-        IAdmNode mergePolicyName = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME);
-        if (mergePolicyName == null) {
-            throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD,
-                    MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME);
-        }
-        if (mergePolicyName.getType() != ATypeTag.STRING) {
-            throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
-                    MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.'
-                            + MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME,
-                    ATypeTag.STRING);
-        }
-        return ((AdmStringNode) mergePolicyName).get();
+    private AdmObjectNode getMergePolicyObject() {
+        return (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETER_NAME);
     }
 
-    private static AdmObjectNode validateWithObject(AdmObjectNode withObject) throws CompilationException {
-        if (withObject == null) {
-            return null;
-        }
-        for (String name : withObject.getFieldNames()) {
-            if (!WITH_OBJECT_FIELDS_SET.contains(name)) {
-                throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, name);
-            }
-        }
-        return withObject;
-    }
-
-    private AdmObjectNode getMergePolicyObject() throws CompilationException {
-        if (withObjectNode == null) {
-            return null;
-        }
-        IAdmNode mergePolicy = validateWithObject(withObjectNode).get(MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME);
+    public String getCompactionPolicy() {
+        AdmObjectNode mergePolicy = getMergePolicyObject();
         if (mergePolicy == null) {
             return null;
         }
-        if (!mergePolicy.isObject()) {
-            throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
-                    MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, ATypeTag.OBJECT);
-        }
-        return (AdmObjectNode) mergePolicy;
+
+        return mergePolicy.getOptionalString(DatasetDeclParametersUtil.MERGE_POLICY_NAME_PARAMETER_NAME);
     }
 
     public Map<String, String> getCompactionPolicyProperties() throws CompilationException {
@@ -192,17 +143,26 @@ public class DatasetDecl extends AbstractStatement {
         if (mergePolicy == null) {
             return null;
         }
-        IAdmNode mergePolicyParameters = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME);
+        IAdmNode mergePolicyParameters =
+                mergePolicy.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETERS_PARAMETER_NAME);
         if (mergePolicyParameters == null) {
             return null;
         }
-        if (mergePolicyParameters.getType() != ATypeTag.OBJECT) {
-            throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
-                    MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.'
-                            + MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME,
-                    ATypeTag.OBJECT);
+        return ConfigurationUtil.toProperties((AdmObjectNode) mergePolicyParameters);
+    }
+
+    public String getDatasetCompressionScheme() {
+        if (datasetType != DatasetType.INTERNAL) {
+            return CompressionManager.NONE;
+        }
+
+        final AdmObjectNode storageBlockCompression =
+                (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME);
+        if (storageBlockCompression == null) {
+            return null;
         }
-        return MergePolicyUtils.toProperties((AdmObjectNode) mergePolicyParameters);
+        return storageBlockCompression
+                .getOptionalString(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME);
     }
 
     public Map<String, String> getHints() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
new file mode 100644
index 0000000..4d0baeb
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.object.base.AdmArrayNode;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.IAdmNode;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+class ConfigurationTypeValidator {
+    //Error information
+    private final Set<String> unknownFieldNames;
+    private final Deque<String> path;
+    private final MutablePair<ATypeTag, ATypeTag> expectedActualTypePair;
+    private ErrorType result;
+
+    public enum ErrorType {
+        UNKNOWN_FIELD_NAMES,
+        TYPE_MISMATCH,
+        MISSING_UNOPTIONAL_FIELD
+    }
+
+    protected ConfigurationTypeValidator() {
+        unknownFieldNames = new HashSet<>();
+        path = new ArrayDeque<>();
+        expectedActualTypePair = new MutablePair<>(null, null);
+    }
+
+    public void validateType(IAType type, IAdmNode node) throws CompilationException {
+        if (!validate(type, node)) {
+            throwException();
+        }
+    }
+
+    private boolean validate(IAType type, IAdmNode node) {
+        if (type.getTypeTag().isDerivedType()) {
+            return validateDerivedType(type, node);
+        } else if (node == null) {
+            result = ErrorType.MISSING_UNOPTIONAL_FIELD;
+            return false;
+        } else if (type.getTypeTag() != node.getType()) {
+            setExpectedAndActualType(type.getTypeTag(), node.getType());
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean validateDerivedType(IAType type, IAdmNode node) {
+        final ATypeTag typeTag = type.getTypeTag();
+        switch (typeTag) {
+            case UNION:
+                return validateUnionType(type, node);
+            case OBJECT:
+                return validateObject(type, node);
+            case ARRAY:
+                return validateArray(type, node);
+            default:
+                throw new IllegalStateException("Unsupported derived type: " + typeTag);
+        }
+    }
+
+    private boolean validateUnionType(IAType type, IAdmNode node) {
+        if (node == null || node.getType() == ATypeTag.NULL) {
+            return true;
+        }
+        return validate(((AUnionType) type).getActualType(), node);
+    }
+
+    private boolean validateObject(IAType type, IAdmNode node) {
+        if (node.getType() != ATypeTag.OBJECT) {
+            setExpectedAndActualType(ATypeTag.OBJECT, node.getType());
+            return false;
+        }
+
+        final ARecordType recordType = (ARecordType) type;
+        final AdmObjectNode objectNode = (AdmObjectNode) node;
+
+        final String[] fieldNames = recordType.getFieldNames();
+
+        //Check field names
+        final Set<String> definedFieldNames = new HashSet<>(Arrays.asList(fieldNames));
+        final Set<String> objectFieldNames = objectNode.getFieldNames();
+        if (!definedFieldNames.containsAll(objectFieldNames)) {
+            setUnknownFieldNames(definedFieldNames, objectFieldNames);
+            return false;
+        }
+
+        final IAType[] fieldTypes = recordType.getFieldTypes();
+
+        for (int i = 0; i < fieldTypes.length; i++) {
+            if (!validate(fieldTypes[i], objectNode.get(fieldNames[i]))) {
+                addToPath(fieldNames[i]);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean validateArray(IAType type, IAdmNode node) {
+        if (node.getType() != ATypeTag.ARRAY) {
+            setExpectedAndActualType(ATypeTag.ARRAY, node.getType());
+            return false;
+        }
+
+        final IAType itemType = ((AOrderedListType) type).getItemType();
+        final AdmArrayNode array = (AdmArrayNode) node;
+        for (int i = 0; i < array.size(); i++) {
+            if (!validate(itemType, array.get(i))) {
+                addToPath(i);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void setUnknownFieldNames(Set<String> definedFieldNames, Set<String> objectFieldNames) {
+        unknownFieldNames.addAll(objectFieldNames);
+        unknownFieldNames.removeAll(definedFieldNames);
+        result = ErrorType.UNKNOWN_FIELD_NAMES;
+    }
+
+    private void setExpectedAndActualType(ATypeTag expectedTypeTag, ATypeTag actualTypeTag) {
+        expectedActualTypePair.left = expectedTypeTag;
+        expectedActualTypePair.right = actualTypeTag;
+        result = ErrorType.TYPE_MISMATCH;
+    }
+
+    private void addToPath(String fieldName) {
+        if (path.isEmpty()) {
+            path.push(fieldName);
+        } else {
+            path.push(fieldName + ".");
+        }
+    }
+
+    private void addToPath(int arrayIndex) {
+        path.push("[" + arrayIndex + "]");
+    }
+
+    private void throwException() throws CompilationException {
+        final StringBuilder pathBuilder = new StringBuilder();
+        while (!path.isEmpty()) {
+            pathBuilder.append(path.pop());
+        }
+        switch (result) {
+            case UNKNOWN_FIELD_NAMES:
+                if (pathBuilder.length() > 0) {
+                    throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_SUBFIELD, unknownFieldNames.toString(),
+                            pathBuilder.toString());
+                }
+                throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, unknownFieldNames.toString());
+            case TYPE_MISMATCH:
+                throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, pathBuilder.toString(),
+                        expectedActualTypePair.left, expectedActualTypePair.right);
+            case MISSING_UNOPTIONAL_FIELD:
+                throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD, pathBuilder.toString());
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
new file mode 100644
index 0000000..4bb799b
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.AdmStringNode;
+import org.apache.asterix.object.base.IAdmNode;
+
+public class ConfigurationUtil {
+
+    private ConfigurationUtil() {
+    }
+
+    /**
+     * Convert the parameters object to a Map<String,String>
+     * This method should go away once we store the with object as it is in storage
+     *
+     * @param parameters
+     *            the parameters passed for the merge policy in the with clause
+     * @return the parameters as a map
+     */
+    public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException {
+        Map<String, String> map = new HashMap<>();
+        for (Entry<String, IAdmNode> field : parameters.getFields()) {
+            IAdmNode value = field.getValue();
+            map.put(field.getKey(), getStringValue(value));
+        }
+        return map;
+    }
+
+    /**
+     * Get string value of {@link IAdmNode}
+     *
+     * @param value
+     *            IAdmNode value should be of type integer or string
+     * @return
+     *         string value of <code>value</code>
+     * @throws CompilationException
+     */
+    public static String getStringValue(IAdmNode value) throws CompilationException {
+        switch (value.getType()) {
+            case BOOLEAN:
+            case DOUBLE:
+            case BIGINT:
+                return value.toString();
+            case STRING:
+                return ((AdmStringNode) value).get();
+            default:
+                throw new CompilationException(ErrorCode.CONFIGURATION_PARAMETER_INVALID_TYPE, value.getType());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
new file mode 100644
index 0000000..effe4b8
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class DatasetDeclParametersUtil {
+    /* ***********************************************
+     * Merge Policy Parameters
+     * ***********************************************
+     */
+    public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy";
+    public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name";
+    public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters";
+    public static final String MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME = "max-mergable-component-size";
+    public static final String MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME = "max-tolerance-component-count";
+    public static final String MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME = "num-components";
+
+    /* ***********************************************
+     * Storage Block Compression Parameters
+     * ***********************************************
+     */
+    public static final String STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME = "storage-block-compression";
+    public static final String STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME = "scheme";
+
+    /* ***********************************************
+     * Private members
+     * ***********************************************
+     */
+    private static final ARecordType WITH_OBJECT_TYPE = getWithObjectType();
+    private static final AdmObjectNode EMPTY_WITH_OBJECT = new AdmObjectNode();
+
+    private DatasetDeclParametersUtil() {
+    }
+
+    public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException {
+        if (withRecord == null) {
+            return EMPTY_WITH_OBJECT;
+        }
+        final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
+        final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
+        validator.validateType(WITH_OBJECT_TYPE, node);
+        return node;
+    }
+
+    private static ARecordType getWithObjectType() {
+        final String[] withNames = { MERGE_POLICY_PARAMETER_NAME, STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME };
+        final IAType[] withTypes = { AUnionType.createUnknownableType(getMergePolicyType()),
+                AUnionType.createUnknownableType(getStorageBlockCompressionType()) };
+        return new ARecordType("withObject", withNames, withTypes, false);
+    }
+
+    private static ARecordType getMergePolicyType() {
+        //merge-policy.parameters
+        final String[] parameterNames = { MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME,
+                MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME, MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME };
+        final IAType[] parametersTypes = { AUnionType.createUnknownableType(BuiltinType.AINT64),
+                AUnionType.createUnknownableType(BuiltinType.AINT64),
+                AUnionType.createUnknownableType(BuiltinType.AINT64) };
+        final ARecordType parameters =
+                new ARecordType(MERGE_POLICY_PARAMETERS_PARAMETER_NAME, parameterNames, parametersTypes, false);
+
+        //merge-policy
+        final String[] mergePolicyNames = { MERGE_POLICY_NAME_PARAMETER_NAME, MERGE_POLICY_PARAMETERS_PARAMETER_NAME };
+        final IAType[] mergePolicyTypes = { BuiltinType.ASTRING, AUnionType.createUnknownableType(parameters) };
+
+        return new ARecordType(MERGE_POLICY_PARAMETER_NAME, mergePolicyNames, mergePolicyTypes, false);
+    }
+
+    private static ARecordType getStorageBlockCompressionType() {
+        final String[] schemeName = { STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME };
+        final IAType[] schemeType = { BuiltinType.ASTRING };
+        return new ARecordType(STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME, schemeName, schemeType, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
index 25f9d07..6adb050 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
@@ -39,13 +39,12 @@ import org.apache.asterix.object.base.AdmNullNode;
 import org.apache.asterix.object.base.AdmObjectNode;
 import org.apache.asterix.object.base.AdmStringNode;
 import org.apache.asterix.object.base.IAdmNode;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public class ExpressionUtils {
     private ExpressionUtils() {
     }
 
-    public static IAdmNode toNode(Expression expr) throws AlgebricksException {
+    public static IAdmNode toNode(Expression expr) throws CompilationException {
         switch (expr.getKind()) {
             case LIST_CONSTRUCTOR_EXPRESSION:
                 return toNode((ListConstructor) expr);
@@ -58,7 +57,7 @@ public class ExpressionUtils {
         }
     }
 
-    public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws AlgebricksException {
+    public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws CompilationException {
         AdmObjectNode node = new AdmObjectNode();
         final List<FieldBinding> fbList = recordConstructor.getFbList();
         for (int i = 0; i < fbList.size(); i++) {
@@ -70,7 +69,7 @@ public class ExpressionUtils {
         return node;
     }
 
-    private static IAdmNode toNode(ListConstructor listConstructor) throws AlgebricksException {
+    private static IAdmNode toNode(ListConstructor listConstructor) throws CompilationException {
         final List<Expression> exprList = listConstructor.getExprList();
         AdmArrayNode array = new AdmArrayNode(exprList.size());
         for (int i = 0; i < exprList.size(); i++) {
@@ -79,7 +78,7 @@ public class ExpressionUtils {
         return array;
     }
 
-    private static IAdmNode toNode(LiteralExpr literalExpr) throws AlgebricksException {
+    private static IAdmNode toNode(LiteralExpr literalExpr) throws CompilationException {
         final Literal value = literalExpr.getValue();
         final Literal.Type literalType = value.getLiteralType();
         switch (literalType) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
deleted file mode 100644
index 6bb5c36..0000000
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.common.util;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.object.base.AdmObjectNode;
-import org.apache.asterix.object.base.AdmStringNode;
-import org.apache.asterix.object.base.IAdmNode;
-
-public class MergePolicyUtils {
-    public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy";
-    public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name";
-    public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters";
-
-    private MergePolicyUtils() {
-    }
-
-    /**
-     * Convert the parameters object to a Map<String,String>
-     * This method should go away once we store the with object as it is in storage
-     *
-     * @param parameters
-     *            the parameters passed for the merge policy in the with clause
-     * @return the parameters as a map
-     */
-    public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException {
-        Map<String, String> map = new HashMap<>();
-        for (Entry<String, IAdmNode> field : parameters.getFields()) {
-            IAdmNode value = field.getValue();
-            switch (value.getType()) {
-                case BOOLEAN:
-                case DOUBLE:
-                case BIGINT:
-                    map.put(field.getKey(), value.toString());
-                    break;
-                case STRING:
-                    map.put(field.getKey(), ((AdmStringNode) value).get());
-                    break;
-                default:
-                    throw new CompilationException(ErrorCode.MERGE_POLICY_PARAMETER_INVALID_TYPE, value.getType());
-            }
-        }
-        return map;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 95526f4..2380ab6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -87,6 +87,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -338,7 +339,8 @@ public class MetadataBootstrap {
                     new AsterixVirtualBufferCacheProvider(datasetId),
                     storageComponentProvider.getIoOperationSchedulerProvider(),
                     appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
-                    bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null);
+                    bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null,
+                    NoOpCompressorDecompressorFactory.INSTANCE);
             DatasetLocalResourceFactory dsLocalResourceFactory =
                     new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same code path as

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index ba1ea03..d9309d9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -163,6 +163,11 @@ public final class MetadataRecordTypes {
     public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 11;
     public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 12;
     public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 13;
+    //Optional open fields
+    public static final String DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME =
+            "BlockLevelStorageCompression";
+    public static final String DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME = "DatasetCompressionScheme";
+    public static final String DATASET_ARECORD_REBALANCE_FIELD_NAME = "rebalanceCount";
     public static final ARecordType DATASET_RECORDTYPE = createRecordType(
             // RecordTypeName
             RECORD_NAME_DATASET,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index f5cfbb3..62a03ad 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -31,6 +31,9 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
  */
 public class DatasetHints {
 
+    private DatasetHints() {
+    }
+
     /**
      * validate the use of a hint
      *

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 301aafb..2a1d551 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
@@ -50,6 +51,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
 
@@ -93,11 +95,20 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
             case INTERNAL:
                 AsterixVirtualBufferCacheProvider vbcProvider =
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+
+                final ICompressorDecompressorFactory compDecompFactory;
+                if (index.isPrimaryIndex()) {
+                    //Compress only primary index
+                    compDecompFactory = mdProvider.getCompressionManager().getFactory(dataset.getCompressionScheme());
+                } else {
+                    compDecompFactory = NoOpCompressorDecompressorFactory.INSTANCE;
+                }
+
                 return new LSMBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits,
                         filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory,
                         metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory,
                         mergePolicyProperties, true, bloomFilterFields, bloomFilterFalsePositiveRate,
-                        index.isPrimaryIndex(), btreeFields);
+                        index.isPrimaryIndex(), btreeFields, compDecompFactory);
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
                         dataset.getDatasetType().toString());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e212d11..85beb95 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -37,6 +37,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.common.storage.ICompressionManager;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -1616,4 +1617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     public ITxnIdFactory getTxnIdFactory() {
         return appCtx.getTxnIdFactory();
     }
+
+    public ICompressionManager getCompressionManager() {
+        return appCtx.getCompressionManager();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index a25ed20..855cf78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -25,8 +25,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.IntStream;
 
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.common.api.IDatasetInfoProvider;
 import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -65,7 +63,7 @@ import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -149,6 +147,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     private final String metaTypeName;
     private final long rebalanceCount;
     private int pendingOp;
+    private final String compressionScheme;
 
     public Dataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName,
             String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties,
@@ -156,29 +155,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             int pendingOp) {
         this(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, /*metaTypeDataverseName*/null,
                 /*metaTypeName*/null, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails,
-                hints, datasetType, datasetId, pendingOp);
+                hints, datasetType, datasetId, pendingOp, CompressionManager.NONE);
     }
 
     public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
             String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
-            DatasetType datasetType, int datasetId, int pendingOp) {
+            DatasetType datasetType, int datasetId, int pendingOp, String compressionScheme) {
         this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName,
                 metaItemTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints,
-                datasetType, datasetId, pendingOp, 0L);
+                datasetType, datasetId, pendingOp, 0L, compressionScheme);
     }
 
     public Dataset(Dataset dataset) {
         this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
                 dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName,
                 dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails,
-                dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount);
+                dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount,
+                dataset.compressionScheme);
     }
 
     public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
             String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
-            DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount) {
+            DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount, String compressionScheme) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.recordTypeName = itemTypeName;
@@ -194,6 +194,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         this.pendingOp = pendingOp;
         this.hints = hints;
         this.rebalanceCount = rebalanceCount;
+        this.compressionScheme = compressionScheme;
     }
 
     @Override
@@ -357,7 +358,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
                     new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(),
                             getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(),
                             getCompactionPolicy(), getCompactionPolicyProperties(), getDatasetDetails(), getHints(),
-                            getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP));
+                            getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP, getCompressionScheme()));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
             bActiveTxn.setValue(false);
@@ -644,6 +645,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         tree.put("metaTypeDataverseName", metaTypeDataverseName);
         tree.put("metaTypeName", metaTypeName);
         tree.put("pendingOp", MetadataUtil.pendingOpToString(pendingOp));
+        tree.put("rebalanceCount", rebalanceCount);
+        tree.put("compressionScheme", compressionScheme);
         return tree;
     }
 
@@ -823,7 +826,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         return new Dataset(this.dataverseName, this.datasetName, this.recordTypeDataverseName, this.recordTypeName,
                 this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory,
                 this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType,
-                DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1);
+                DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1,
+                this.compressionScheme);
     }
 
     // Gets an array of partition numbers for this dataset.
@@ -840,4 +844,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     public String getFullyQualifiedName() {
         return dataverseName + '.' + datasetName;
     }
+
+    public String getCompressionScheme() {
+        return compressionScheme;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index d5e179b..27978ab 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -60,10 +60,13 @@ import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.AUnorderedList;
 import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.AUnorderedListType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -77,7 +80,6 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
     private static final long serialVersionUID = 1L;
     // Payload field containing serialized Dataset.
     public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
-    private static final String REBALANCE_ID_FIELD_NAME = "rebalanceCount";
 
     @SuppressWarnings("unchecked")
     protected final ISerializerDeserializer<ARecord> recordSerDes =
@@ -256,14 +258,33 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
             metaTypeName = ((AString) datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue();
         }
 
-        // Read the rebalance count if there is one.
-        int rebalanceCountIndex = datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME);
-        long rebalanceCount = rebalanceCountIndex >= 0
-                ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() : 0;
+        long rebalanceCount = getRebalanceCount(datasetRecord);
+        String compressionScheme = getCompressionScheme(datasetRecord);
 
         return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
                 nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
-                datasetId, pendingOp, rebalanceCount);
+                datasetId, pendingOp, rebalanceCount, compressionScheme);
+    }
+
+    private long getRebalanceCount(ARecord datasetRecord) {
+        // Read the rebalance count if there is one.
+        int rebalanceCountIndex =
+                datasetRecord.getType().getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME);
+        return rebalanceCountIndex >= 0 ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue()
+                : 0;
+    }
+
+    private String getCompressionScheme(ARecord datasetRecord) {
+        final ARecordType datasetType = datasetRecord.getType();
+        final int compressionIndex = datasetType
+                .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME);
+        if (compressionIndex >= 0) {
+            final ARecordType compressionType = (ARecordType) datasetType.getFieldTypes()[compressionIndex];
+            final int schemeIndex = compressionType
+                    .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME);
+            return ((AString) datasetRecord.getValueByPos(schemeIndex)).getStringValue();
+        }
+        return CompressionManager.NONE;
     }
 
     @Override
@@ -392,8 +413,19 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
         return tuple;
     }
 
+    /**
+     * Keep protected to allow other extensions to add additional fields
+     *
+     * @param dataset
+     * @throws HyracksDataException
+     */
     protected void writeOpenFields(Dataset dataset) throws HyracksDataException {
-        // write open fields
+        writeMetaPart(dataset);
+        writeRebalanceCount(dataset);
+        writeBlockLevelStorageCompression(dataset);
+    }
+
+    private void writeMetaPart(Dataset dataset) throws HyracksDataException {
         if (dataset.hasMetaPart()) {
             // write open field 1, the meta item type Dataverse name.
             fieldName.reset();
@@ -413,10 +445,35 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
             stringSerde.serialize(aString, fieldValue.getDataOutput());
             recordBuilder.addField(fieldName, fieldValue);
         }
+    }
+
+    private void writeBlockLevelStorageCompression(Dataset dataset) throws HyracksDataException {
+        if (CompressionManager.NONE.equals(dataset.getCompressionScheme())) {
+            return;
+        }
+        RecordBuilder compressionObject = new RecordBuilder();
+        compressionObject.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        fieldName.reset();
+        aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME);
+        stringSerde.serialize(aString, fieldName.getDataOutput());
+        fieldValue.reset();
+        aString.setValue(dataset.getCompressionScheme());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        compressionObject.addField(fieldName, fieldValue);
+
+        fieldName.reset();
+        aString.setValue(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME);
+        stringSerde.serialize(aString, fieldName.getDataOutput());
+        fieldValue.reset();
+        compressionObject.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(fieldName, fieldValue);
+    }
+
+    private void writeRebalanceCount(Dataset dataset) throws HyracksDataException {
         if (dataset.getRebalanceCount() > 0) {
             // Adds the field rebalanceCount.
             fieldName.reset();
-            aString.setValue("rebalanceCount");
+            aString.setValue(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME);
             stringSerde.serialize(aString, fieldName.getDataOutput());
             fieldValue.reset();
             aBigInt.setValue(dataset.getRebalanceCount());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index b87ef2a..902ee41 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.junit.Assert;
@@ -51,9 +51,9 @@ public class DatasetTupleTranslatorTest {
                     indicator == null ? null : Collections.singletonList(indicator),
                     Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
 
-            Dataset dataset =
-                    new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix",
-                            compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0);
+            Dataset dataset = new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES",
+                    "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0,
+                    CompressionManager.NONE);
             DatasetTupleTranslator dtTranslator = new DatasetTupleTranslator(true);
             ITupleReference tuple = dtTranslator.getTupleFromMetadataEntity(dataset);
             Dataset deserializedDataset = dtTranslator.getMetadataEntityFromTuple(tuple);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index ab4229c..7080dee 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Datatype;
@@ -41,6 +40,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.junit.Assert;
@@ -62,9 +62,9 @@ public class IndexTupleTranslatorTest {
                     indicator == null ? null : Collections.singletonList(indicator),
                     Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
 
-            Dataset dataset =
-                    new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix",
-                            compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0);
+            Dataset dataset = new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES",
+                    "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0,
+                    CompressionManager.NONE);
 
             Index index = new Index("test", "d1", "i1", IndexType.BTREE,
                     Collections.singletonList(Collections.singletonList("row_id")),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
index 99b7176..6d2658b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
@@ -153,6 +153,14 @@ public class AdmObjectNode implements IAdmNode {
         return getString(this, field);
     }
 
+    public String getOptionalString(String field) {
+        final IAdmNode node = get(field);
+        if (node == null) {
+            return null;
+        }
+        return ((AdmStringNode) node).get();
+    }
+
     public static String getString(AdmObjectNode openFields, String field) throws HyracksDataException {
         IAdmNode node = openFields.get(field);
         if (node == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
new file mode 100644
index 0000000..3bffa9a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.compression;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.storage.ICompressionManager;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressorFactory;
+
+public class CompressionManager implements ICompressionManager {
+    private static final Map<String, Class<? extends ICompressorDecompressorFactory>> REGISTERED_SCHEMES =
+            getRegisteredSchemes();
+    public static final String NONE = "none";
+    private final String defaultScheme;
+
+    /*
+     * New compression schemes can be added by registering the name and the factory class
+     *
+     * WARNING: Changing scheme name will breakdown storage back compatibility. Before upgrading to a newer
+     * version of the registered schemes, make sure it is also back-compatible with the previous version.
+     */
+    private static Map<String, Class<? extends ICompressorDecompressorFactory>> getRegisteredSchemes() {
+        final Map<String, Class<? extends ICompressorDecompressorFactory>> registeredSchemes = new HashMap<>();
+        //No compression
+        registeredSchemes.put(NONE, NoOpCompressorDecompressorFactory.class);
+        registeredSchemes.put("snappy", SnappyCompressorDecompressorFactory.class);
+        return registeredSchemes;
+    }
+
+    public CompressionManager(StorageProperties storageProperties) {
+        validateCompressionConfiguration(storageProperties);
+        defaultScheme = storageProperties.getCompressionScheme();
+    }
+
+    @Override
+    public ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException {
+        final String scheme = getDdlOrDefaultCompressionScheme(schemeName);
+        Class<? extends ICompressorDecompressorFactory> clazz = REGISTERED_SCHEMES.get(scheme);
+        try {
+            return clazz.newInstance();
+        } catch (IllegalAccessException | InstantiationException e) {
+            throw new IllegalStateException("Failed to instantiate compressor/decompressor: " + scheme, e);
+        }
+    }
+
+    @Override
+    public String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException {
+        if (ddlScheme != null && !isRegisteredScheme(ddlScheme)) {
+            throw new CompilationException(ErrorCode.UNKNOWN_COMPRESSION_SCHEME, ddlScheme, formatSupportedValues());
+        }
+
+        return ddlScheme != null ? ddlScheme : defaultScheme;
+    }
+
+    /**
+     * Register factory classes for persisted resources
+     *
+     * @param registeredClasses
+     */
+    public static void registerCompressorDecompressorsFactoryClasses(
+            Map<String, Class<? extends IJsonSerializable>> registeredClasses) {
+        for (Class<? extends ICompressorDecompressorFactory> clazz : REGISTERED_SCHEMES.values()) {
+            registeredClasses.put(clazz.getSimpleName(), clazz);
+        }
+    }
+
+    /**
+     * @param schemeName
+     *            Compression scheme name
+     * @return
+     *         true if it is registered
+     */
+    private boolean isRegisteredScheme(String schemeName) {
+        return schemeName != null && REGISTERED_SCHEMES.containsKey(schemeName.toLowerCase());
+    }
+
+    /**
+     * Validate the configuration of StorageProperties
+     *
+     * @param storageProperties
+     */
+    private void validateCompressionConfiguration(StorageProperties storageProperties) {
+        if (!isRegisteredScheme(storageProperties.getCompressionScheme())) {
+            final String option = StorageProperties.Option.STORAGE_COMPRESSION_BLOCK.ini();
+            final String value = storageProperties.getCompressionScheme();
+            throw new IllegalStateException("Invalid compression configuration (" + option + " = " + value
+                    + "). Valid values are: " + formatSupportedValues());
+        }
+
+    }
+
+    private String formatSupportedValues() {
+        final StringBuilder schemes = new StringBuilder();
+        final Iterator<String> iterator = REGISTERED_SCHEMES.keySet().iterator();
+        schemes.append('[');
+        schemes.append(iterator.next());
+        while (iterator.hasNext()) {
+            schemes.append(',');
+            schemes.append(iterator.next());
+        }
+        schemes.append(']');
+        return schemes.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 4157e16..0d2a1df 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ActiveProperties;
@@ -44,7 +43,10 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
+import org.apache.asterix.common.storage.ICompressionManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.asterix.runtime.job.listener.NodeJobTracker;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -87,6 +89,7 @@ public class CcApplicationContext implements ICcApplicationContext {
     private IClusterStateManager clusterStateManager;
     private final INodeJobTracker nodeJobTracker;
     private final ITxnIdFactory txnIdFactory;
+    private final ICompressionManager compressionManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
@@ -121,6 +124,7 @@ public class CcApplicationContext implements ICcApplicationContext {
         this.resourceIdManager = new ResourceIdManager(clusterStateManager);
         nodeJobTracker = new NodeJobTracker();
         txnIdFactory = new BulkTxnIdFactory();
+        compressionManager = new CompressionManager(storageProperties);
 
     }
 
@@ -270,7 +274,13 @@ public class CcApplicationContext implements ICcApplicationContext {
         return NoOpCoordinationService.INSTANCE;
     }
 
+    @Override
     public ITxnIdFactory getTxnIdFactory() {
         return txnIdFactory;
     }
+
+    @Override
+    public ICompressionManager getCompressionManager() {
+        return compressionManager;
+    }
 }