You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:49 UTC
[04/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce
compressed storage
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ed864cc..e52449c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -11121,4 +11121,63 @@
</test-case>
</test-group>
&GeoQueries;
+ <test-group name="compression">
+ <test-case FilePath="compression">
+ <compilation-unit name="incompressible-pages/large-page">
+ <output-dir compare="Text">incompressible-pages/large-page</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="compression">
+ <compilation-unit name="incompressible-pages/small-page">
+ <output-dir compare="Text">incompressible-pages/small-page</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="compression">
+ <compilation-unit name="invalid-compression-scheme">
+ <output-dir compare="Text">invalid-compression-scheme</output-dir>
+ <expected-error>ASX1096: Unknown compression scheme zip. Supported schemes are [snappy,none]</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="compression">
+ <compilation-unit name="scheme-none">
+ <output-dir compare="Text">scheme-none</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="compression">
+ <compilation-unit name="scheme-snappy">
+ <output-dir compare="Text">scheme-snappy</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="ddl-with-clause">
+ <test-case FilePath="ddl-with-clause">
+ <compilation-unit name="missing-non-optional">
+ <output-dir compare="Text">missing-non-optional</output-dir>
+ <expected-error>ASX1061: Field "merge-policy.name" in the with clause cannot be null or missing</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="ddl-with-clause">
+ <compilation-unit name="type-mismatch">
+ <output-dir compare="Text">type-mismatch</output-dir>
+ <expected-error>ASX1060: Field "merge-policy.parameters.max-mergable-component-size" in the with clause must be of type bigint, but found string</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="ddl-with-clause">
+ <compilation-unit name="unsupported-field">
+ <output-dir compare="Text">unsupported-field</output-dir>
+ <expected-error>ASX1059: Field(s) [unknown-field] unsupported in the with clause</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="ddl-with-clause">
+ <compilation-unit name="unsupported-subfield">
+ <output-dir compare="Text">unsupported-subfield</output-dir>
+ <expected-error>ASX1097: Subfield(s) [unknown-subfield] in "merge-policy" unsupported in the with clause</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 85e44ea..6bd73e7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -22,6 +22,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER;
import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
@@ -47,7 +48,8 @@ public class StorageProperties extends AbstractProperties {
STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES(POSITIVE_INTEGER, 8),
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
- STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8);
+ STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8),
+ STORAGE_COMPRESSION_BLOCK(STRING, "none");
private final IOptionType interpreter;
private final Object defaultValue;
@@ -88,6 +90,8 @@ public class StorageProperties extends AbstractProperties {
return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
case STORAGE_MAX_ACTIVE_WRITABLE_DATASETS:
return "The maximum number of datasets that can be concurrently modified";
+ case STORAGE_COMPRESSION_BLOCK:
+ return "The default compression scheme for the storage";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -179,6 +183,10 @@ public class StorageProperties extends AbstractProperties {
return accessor.getInt(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS);
}
+ public String getCompressionScheme() {
+ return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK);
+ }
+
protected int getMetadataDatasets() {
return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 18e3327..81ae3e1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,14 +21,15 @@ package org.apache.asterix.common.dataflow;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
+import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.storage.common.IStorageManager;
@@ -127,4 +128,9 @@ public interface ICcApplicationContext extends IApplicationContext {
* @return the transaction id factory
*/
ITxnIdFactory getTxnIdFactory();
+
+ /**
+ * @return the compression manager
+ */
+ ICompressionManager getCompressionManager();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0bf446e..54fd65c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -147,7 +147,7 @@ public class ErrorCode {
public static final int UNSUPPORTED_WITH_FIELD = 1059;
public static final int WITH_FIELD_MUST_BE_OF_TYPE = 1060;
public static final int WITH_FIELD_MUST_CONTAIN_SUB_FIELD = 1061;
- public static final int MERGE_POLICY_PARAMETER_INVALID_TYPE = 1062;
+ public static final int CONFIGURATION_PARAMETER_INVALID_TYPE = 1062;
public static final int UNKNOWN_DATAVERSE = 1063;
public static final int ERROR_OCCURRED_BETWEEN_TWO_TYPES_CONVERSION = 1064;
public static final int CHOSEN_INDEX_COUNT_SHOULD_BE_GREATER_THAN_ONE = 1065;
@@ -181,6 +181,8 @@ public class ErrorCode {
public static final int COMPILATION_TRANSLATION_ERROR = 1093;
public static final int RANGE_MAP_ERROR = 1094;
public static final int COMPILATION_EXPECTED_FUNCTION_CALL = 1095;
+ public static final int UNKNOWN_COMPRESSION_SCHEME = 1096;
+ public static final int UNSUPPORTED_WITH_SUBFIELD = 1097;
// Feed errors
public static final int DATAFLOW_ILLEGAL_STATE = 3001;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
new file mode 100644
index 0000000..b9a0baf
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ICompressionManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+
+/**
+ * An interface for the compression manager which handles all the registered
+ * schemes and validates the provided configurations.
+ */
+public interface ICompressionManager {
+
+ /**
+ * Get a registered CompressorDecompressorFactory
+ *
+ * @param schemeName
+ * Compression scheme name
+ * @return Compressor/Decompressor factory if the scheme is specified or NOOP o.w
+ * @throws CompilationException
+ */
+ ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException;
+
+ /**
+ * Get the specified compression scheme in the DDL or the default one
+ *
+ * @param ddlScheme
+ * Compression scheme name from DDL
+ * @return DDL or default compression scheme name
+ * @throws CompilationException
+ */
+ String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 8c17ec6..7aa9b91 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -132,10 +132,10 @@
1056 = Too many options were specified for %1$s
1057 = Expression of type %1$s is not supported in constant record
1058 = Literal of type %1$s is not supported in constant record
-1059 = Field \"%1$s\" unsupported in the with clause
-1060 = Field \"%1$s\" in the with clause must be of type %2$s
-1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\"
-1062 = Merge policy parameters cannot be of type %1$s
+1059 = Field(s) %1$s unsupported in the with clause
+1060 = Field \"%1$s\" in the with clause must be of type %2$s, but found %3$s
+1061 = Field \"%1$s\" in the with clause cannot be null or missing
+1062 = Configuration parameter cannot be of type %1$s
1063 = Cannot find dataverse with name %1$s
1064 = An error was occurred while converting type %1$s to type %2$s.
1065 = There should be at least two applicable indexes.
@@ -168,6 +168,8 @@
1093 = A parser error has occurred. The detail exception: %1$s
1094 = Cannot parse range map: %1$s
1095 = Expected function call
+1096 = Unknown compression scheme %1$s. Supported schemes are %2$s
+1097 = Subfield(s) %1$s in \"%2$s\" unsupported in the with clause
# Feed Errors
3001 = Illegal state.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/pom.xml b/asterixdb/asterix-lang-common/pom.xml
index dde41e0..bb153a5 100644
--- a/asterixdb/asterix-lang-common/pom.xml
+++ b/asterixdb/asterix-lang-common/pom.xml
@@ -107,5 +107,9 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index 3d5b815..9f01b1c 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -18,19 +18,19 @@
*/
package org.apache.asterix.lang.common.statement;
+import java.util.Map;
+
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.base.AbstractStatement;
import org.apache.asterix.lang.common.expression.RecordConstructor;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.ConfigurationUtil;
import org.apache.asterix.lang.common.util.ExpressionUtils;
-import org.apache.asterix.lang.common.util.MergePolicyUtils;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.object.base.AdmObjectNode;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
-import java.util.Map;
-
/**
* The new create feed statement only concerns the feed adaptor configuration.
* All feeds are considered as primary feeds.
@@ -76,7 +76,7 @@ public class CreateFeedStatement extends AbstractStatement {
}
public Map<String, String> getConfiguration() throws CompilationException {
- return MergePolicyUtils.toProperties(withObjectNode);
+ return ConfigurationUtil.toProperties(withObjectNode);
}
public AdmObjectNode getWithObjectNode() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 4aeb6d3..0a17b24 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -18,31 +18,22 @@
*/
package org.apache.asterix.lang.common.statement;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.lang.common.base.AbstractStatement;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.RecordConstructor;
import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.util.ExpressionUtils;
-import org.apache.asterix.lang.common.util.MergePolicyUtils;
+import org.apache.asterix.lang.common.util.ConfigurationUtil;
+import org.apache.asterix.lang.common.util.DatasetDeclParametersUtil;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.object.base.AdmObjectNode;
-import org.apache.asterix.object.base.AdmStringNode;
import org.apache.asterix.object.base.IAdmNode;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.asterix.runtime.compression.CompressionManager;
public class DatasetDecl extends AbstractStatement {
- protected static final String[] WITH_OBJECT_FIELDS = new String[] { MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME };
- protected static final Set<String> WITH_OBJECT_FIELDS_SET = new HashSet<>(Arrays.asList(WITH_OBJECT_FIELDS));
-
protected final Identifier name;
protected final Identifier dataverse;
protected final Identifier itemTypeDataverse;
@@ -76,14 +67,7 @@ public class DatasetDecl extends AbstractStatement {
}
this.nodegroupName = nodeGroupName;
this.hints = hints;
- try {
- this.withObjectNode = withRecord == null ? null : ExpressionUtils.toNode(withRecord);
- } catch (CompilationException e) {
- throw e;
- } catch (AlgebricksException e) {
- // TODO(tillw) make signatures throw Algebricks exceptions
- throw new CompilationException(e);
- }
+ this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord);
this.ifNotExists = ifNotExists;
this.datasetType = datasetType;
this.datasetDetailsDecl = idd;
@@ -141,50 +125,17 @@ public class DatasetDecl extends AbstractStatement {
return nodegroupName;
}
- public String getCompactionPolicy() throws CompilationException {
- AdmObjectNode mergePolicy = getMergePolicyObject();
- if (mergePolicy == null) {
- return null;
- }
- IAdmNode mergePolicyName = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME);
- if (mergePolicyName == null) {
- throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD,
- MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME);
- }
- if (mergePolicyName.getType() != ATypeTag.STRING) {
- throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
- MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.'
- + MergePolicyUtils.MERGE_POLICY_NAME_PARAMETER_NAME,
- ATypeTag.STRING);
- }
- return ((AdmStringNode) mergePolicyName).get();
+ private AdmObjectNode getMergePolicyObject() {
+ return (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETER_NAME);
}
- private static AdmObjectNode validateWithObject(AdmObjectNode withObject) throws CompilationException {
- if (withObject == null) {
- return null;
- }
- for (String name : withObject.getFieldNames()) {
- if (!WITH_OBJECT_FIELDS_SET.contains(name)) {
- throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, name);
- }
- }
- return withObject;
- }
-
- private AdmObjectNode getMergePolicyObject() throws CompilationException {
- if (withObjectNode == null) {
- return null;
- }
- IAdmNode mergePolicy = validateWithObject(withObjectNode).get(MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME);
+ public String getCompactionPolicy() {
+ AdmObjectNode mergePolicy = getMergePolicyObject();
if (mergePolicy == null) {
return null;
}
- if (!mergePolicy.isObject()) {
- throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
- MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME, ATypeTag.OBJECT);
- }
- return (AdmObjectNode) mergePolicy;
+
+ return mergePolicy.getOptionalString(DatasetDeclParametersUtil.MERGE_POLICY_NAME_PARAMETER_NAME);
}
public Map<String, String> getCompactionPolicyProperties() throws CompilationException {
@@ -192,17 +143,26 @@ public class DatasetDecl extends AbstractStatement {
if (mergePolicy == null) {
return null;
}
- IAdmNode mergePolicyParameters = mergePolicy.get(MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME);
+ IAdmNode mergePolicyParameters =
+ mergePolicy.get(DatasetDeclParametersUtil.MERGE_POLICY_PARAMETERS_PARAMETER_NAME);
if (mergePolicyParameters == null) {
return null;
}
- if (mergePolicyParameters.getType() != ATypeTag.OBJECT) {
- throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE,
- MergePolicyUtils.MERGE_POLICY_PARAMETER_NAME + '.'
- + MergePolicyUtils.MERGE_POLICY_PARAMETERS_PARAMETER_NAME,
- ATypeTag.OBJECT);
+ return ConfigurationUtil.toProperties((AdmObjectNode) mergePolicyParameters);
+ }
+
+ public String getDatasetCompressionScheme() {
+ if (datasetType != DatasetType.INTERNAL) {
+ return CompressionManager.NONE;
+ }
+
+ final AdmObjectNode storageBlockCompression =
+ (AdmObjectNode) withObjectNode.get(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME);
+ if (storageBlockCompression == null) {
+ return null;
}
- return MergePolicyUtils.toProperties((AdmObjectNode) mergePolicyParameters);
+ return storageBlockCompression
+ .getOptionalString(DatasetDeclParametersUtil.STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME);
}
public Map<String, String> getHints() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
new file mode 100644
index 0000000..4d0baeb
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationTypeValidator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.object.base.AdmArrayNode;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.IAdmNode;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+class ConfigurationTypeValidator {
+ //Error information
+ private final Set<String> unknownFieldNames;
+ private final Deque<String> path;
+ private final MutablePair<ATypeTag, ATypeTag> expectedActualTypePair;
+ private ErrorType result;
+
+ public enum ErrorType {
+ UNKNOWN_FIELD_NAMES,
+ TYPE_MISMATCH,
+ MISSING_UNOPTIONAL_FIELD
+ }
+
+ protected ConfigurationTypeValidator() {
+ unknownFieldNames = new HashSet<>();
+ path = new ArrayDeque<>();
+ expectedActualTypePair = new MutablePair<>(null, null);
+ }
+
+ public void validateType(IAType type, IAdmNode node) throws CompilationException {
+ if (!validate(type, node)) {
+ throwException();
+ }
+ }
+
+ private boolean validate(IAType type, IAdmNode node) {
+ if (type.getTypeTag().isDerivedType()) {
+ return validateDerivedType(type, node);
+ } else if (node == null) {
+ result = ErrorType.MISSING_UNOPTIONAL_FIELD;
+ return false;
+ } else if (type.getTypeTag() != node.getType()) {
+ setExpectedAndActualType(type.getTypeTag(), node.getType());
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean validateDerivedType(IAType type, IAdmNode node) {
+ final ATypeTag typeTag = type.getTypeTag();
+ switch (typeTag) {
+ case UNION:
+ return validateUnionType(type, node);
+ case OBJECT:
+ return validateObject(type, node);
+ case ARRAY:
+ return validateArray(type, node);
+ default:
+ throw new IllegalStateException("Unsupported derived type: " + typeTag);
+ }
+ }
+
+ private boolean validateUnionType(IAType type, IAdmNode node) {
+ if (node == null || node.getType() == ATypeTag.NULL) {
+ return true;
+ }
+ return validate(((AUnionType) type).getActualType(), node);
+ }
+
+ private boolean validateObject(IAType type, IAdmNode node) {
+ if (node.getType() != ATypeTag.OBJECT) {
+ setExpectedAndActualType(ATypeTag.OBJECT, node.getType());
+ return false;
+ }
+
+ final ARecordType recordType = (ARecordType) type;
+ final AdmObjectNode objectNode = (AdmObjectNode) node;
+
+ final String[] fieldNames = recordType.getFieldNames();
+
+ //Check field names
+ final Set<String> definedFieldNames = new HashSet<>(Arrays.asList(fieldNames));
+ final Set<String> objectFieldNames = objectNode.getFieldNames();
+ if (!definedFieldNames.containsAll(objectFieldNames)) {
+ setUnknownFieldNames(definedFieldNames, objectFieldNames);
+ return false;
+ }
+
+ final IAType[] fieldTypes = recordType.getFieldTypes();
+
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (!validate(fieldTypes[i], objectNode.get(fieldNames[i]))) {
+ addToPath(fieldNames[i]);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean validateArray(IAType type, IAdmNode node) {
+ if (node.getType() != ATypeTag.ARRAY) {
+ setExpectedAndActualType(ATypeTag.ARRAY, node.getType());
+ return false;
+ }
+
+ final IAType itemType = ((AOrderedListType) type).getItemType();
+ final AdmArrayNode array = (AdmArrayNode) node;
+ for (int i = 0; i < array.size(); i++) {
+ if (!validate(itemType, array.get(i))) {
+ addToPath(i);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void setUnknownFieldNames(Set<String> definedFieldNames, Set<String> objectFieldNames) {
+ unknownFieldNames.addAll(objectFieldNames);
+ unknownFieldNames.removeAll(definedFieldNames);
+ result = ErrorType.UNKNOWN_FIELD_NAMES;
+ }
+
+ private void setExpectedAndActualType(ATypeTag expectedTypeTag, ATypeTag actualTypeTag) {
+ expectedActualTypePair.left = expectedTypeTag;
+ expectedActualTypePair.right = actualTypeTag;
+ result = ErrorType.TYPE_MISMATCH;
+ }
+
+ private void addToPath(String fieldName) {
+ if (path.isEmpty()) {
+ path.push(fieldName);
+ } else {
+ path.push(fieldName + ".");
+ }
+ }
+
+ private void addToPath(int arrayIndex) {
+ path.push("[" + arrayIndex + "]");
+ }
+
+ private void throwException() throws CompilationException {
+ final StringBuilder pathBuilder = new StringBuilder();
+ while (!path.isEmpty()) {
+ pathBuilder.append(path.pop());
+ }
+ switch (result) {
+ case UNKNOWN_FIELD_NAMES:
+ if (pathBuilder.length() > 0) {
+ throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_SUBFIELD, unknownFieldNames.toString(),
+ pathBuilder.toString());
+ }
+ throw new CompilationException(ErrorCode.UNSUPPORTED_WITH_FIELD, unknownFieldNames.toString());
+ case TYPE_MISMATCH:
+ throw new CompilationException(ErrorCode.WITH_FIELD_MUST_BE_OF_TYPE, pathBuilder.toString(),
+ expectedActualTypePair.left, expectedActualTypePair.right);
+ case MISSING_UNOPTIONAL_FIELD:
+ throw new CompilationException(ErrorCode.WITH_FIELD_MUST_CONTAIN_SUB_FIELD, pathBuilder.toString());
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
new file mode 100644
index 0000000..4bb799b
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ConfigurationUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.AdmStringNode;
+import org.apache.asterix.object.base.IAdmNode;
+
+public class ConfigurationUtil {
+
+ private ConfigurationUtil() {
+ }
+
+ /**
+ * Convert the parameters object to a Map<String,String>
+ * This method should go away once we store the with object as it is in storage
+ *
+ * @param parameters
+ * the parameters passed for the merge policy in the with clause
+ * @return the parameters as a map
+ */
+ public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException {
+ Map<String, String> map = new HashMap<>();
+ for (Entry<String, IAdmNode> field : parameters.getFields()) {
+ IAdmNode value = field.getValue();
+ map.put(field.getKey(), getStringValue(value));
+ }
+ return map;
+ }
+
+ /**
+ * Get string value of {@link IAdmNode}
+ *
+ * @param value
+ * IAdmNode value should be of type integer or string
+ * @return
+ * string value of <code>value</code>
+ * @throws CompilationException
+ */
+ public static String getStringValue(IAdmNode value) throws CompilationException {
+ switch (value.getType()) {
+ case BOOLEAN:
+ case DOUBLE:
+ case BIGINT:
+ return value.toString();
+ case STRING:
+ return ((AdmStringNode) value).get();
+ default:
+ throw new CompilationException(ErrorCode.CONFIGURATION_PARAMETER_INVALID_TYPE, value.getType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
new file mode 100644
index 0000000..effe4b8
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.util;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class DatasetDeclParametersUtil {
+ /* ***********************************************
+ * Merge Policy Parameters
+ * ***********************************************
+ */
+ public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy";
+ public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name";
+ public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters";
+ public static final String MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME = "max-mergable-component-size";
+ public static final String MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME = "max-tolerance-component-count";
+ public static final String MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME = "num-components";
+
+ /* ***********************************************
+ * Storage Block Compression Parameters
+ * ***********************************************
+ */
+ public static final String STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME = "storage-block-compression";
+ public static final String STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME = "scheme";
+
+ /* ***********************************************
+ * Private members
+ * ***********************************************
+ */
+ private static final ARecordType WITH_OBJECT_TYPE = getWithObjectType();
+ private static final AdmObjectNode EMPTY_WITH_OBJECT = new AdmObjectNode();
+
+ private DatasetDeclParametersUtil() {
+ }
+
+ public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException {
+ if (withRecord == null) {
+ return EMPTY_WITH_OBJECT;
+ }
+ final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
+ final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
+ validator.validateType(WITH_OBJECT_TYPE, node);
+ return node;
+ }
+
+ private static ARecordType getWithObjectType() {
+ final String[] withNames = { MERGE_POLICY_PARAMETER_NAME, STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME };
+ final IAType[] withTypes = { AUnionType.createUnknownableType(getMergePolicyType()),
+ AUnionType.createUnknownableType(getStorageBlockCompressionType()) };
+ return new ARecordType("withObject", withNames, withTypes, false);
+ }
+
+ private static ARecordType getMergePolicyType() {
+ //merge-policy.parameters
+ final String[] parameterNames = { MERGE_POLICY_MERGABLE_SIZE_PARAMETER_NAME,
+ MERGE_POLICY_TOLERANCE_COUNT_PARAMETER_NAME, MERGE_POLICY_NUMBER_COMPONENTS_PARAMETER_NAME };
+ final IAType[] parametersTypes = { AUnionType.createUnknownableType(BuiltinType.AINT64),
+ AUnionType.createUnknownableType(BuiltinType.AINT64),
+ AUnionType.createUnknownableType(BuiltinType.AINT64) };
+ final ARecordType parameters =
+ new ARecordType(MERGE_POLICY_PARAMETERS_PARAMETER_NAME, parameterNames, parametersTypes, false);
+
+ //merge-policy
+ final String[] mergePolicyNames = { MERGE_POLICY_NAME_PARAMETER_NAME, MERGE_POLICY_PARAMETERS_PARAMETER_NAME };
+ final IAType[] mergePolicyTypes = { BuiltinType.ASTRING, AUnionType.createUnknownableType(parameters) };
+
+ return new ARecordType(MERGE_POLICY_PARAMETER_NAME, mergePolicyNames, mergePolicyTypes, false);
+ }
+
+ private static ARecordType getStorageBlockCompressionType() {
+ final String[] schemeName = { STORAGE_BLOCK_COMPRESSION_SCHEME_PARAMETER_NAME };
+ final IAType[] schemeType = { BuiltinType.ASTRING };
+ return new ARecordType(STORAGE_BLOCK_COMPRESSION_PARAMETER_NAME, schemeName, schemeType, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
index 25f9d07..6adb050 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/ExpressionUtils.java
@@ -39,13 +39,12 @@ import org.apache.asterix.object.base.AdmNullNode;
import org.apache.asterix.object.base.AdmObjectNode;
import org.apache.asterix.object.base.AdmStringNode;
import org.apache.asterix.object.base.IAdmNode;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
public class ExpressionUtils {
private ExpressionUtils() {
}
- public static IAdmNode toNode(Expression expr) throws AlgebricksException {
+ public static IAdmNode toNode(Expression expr) throws CompilationException {
switch (expr.getKind()) {
case LIST_CONSTRUCTOR_EXPRESSION:
return toNode((ListConstructor) expr);
@@ -58,7 +57,7 @@ public class ExpressionUtils {
}
}
- public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws AlgebricksException {
+ public static AdmObjectNode toNode(RecordConstructor recordConstructor) throws CompilationException {
AdmObjectNode node = new AdmObjectNode();
final List<FieldBinding> fbList = recordConstructor.getFbList();
for (int i = 0; i < fbList.size(); i++) {
@@ -70,7 +69,7 @@ public class ExpressionUtils {
return node;
}
- private static IAdmNode toNode(ListConstructor listConstructor) throws AlgebricksException {
+ private static IAdmNode toNode(ListConstructor listConstructor) throws CompilationException {
final List<Expression> exprList = listConstructor.getExprList();
AdmArrayNode array = new AdmArrayNode(exprList.size());
for (int i = 0; i < exprList.size(); i++) {
@@ -79,7 +78,7 @@ public class ExpressionUtils {
return array;
}
- private static IAdmNode toNode(LiteralExpr literalExpr) throws AlgebricksException {
+ private static IAdmNode toNode(LiteralExpr literalExpr) throws CompilationException {
final Literal value = literalExpr.getValue();
final Literal.Type literalType = value.getLiteralType();
switch (literalType) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
deleted file mode 100644
index 6bb5c36..0000000
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/MergePolicyUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.common.util;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.object.base.AdmObjectNode;
-import org.apache.asterix.object.base.AdmStringNode;
-import org.apache.asterix.object.base.IAdmNode;
-
-public class MergePolicyUtils {
- public static final String MERGE_POLICY_PARAMETER_NAME = "merge-policy";
- public static final String MERGE_POLICY_NAME_PARAMETER_NAME = "name";
- public static final String MERGE_POLICY_PARAMETERS_PARAMETER_NAME = "parameters";
-
- private MergePolicyUtils() {
- }
-
- /**
- * Convert the parameters object to a Map<String,String>
- * This method should go away once we store the with object as it is in storage
- *
- * @param parameters
- * the parameters passed for the merge policy in the with clause
- * @return the parameters as a map
- */
- public static Map<String, String> toProperties(AdmObjectNode parameters) throws CompilationException {
- Map<String, String> map = new HashMap<>();
- for (Entry<String, IAdmNode> field : parameters.getFields()) {
- IAdmNode value = field.getValue();
- switch (value.getType()) {
- case BOOLEAN:
- case DOUBLE:
- case BIGINT:
- map.put(field.getKey(), value.toString());
- break;
- case STRING:
- map.put(field.getKey(), ((AdmStringNode) value).get());
- break;
- default:
- throw new CompilationException(ErrorCode.MERGE_POLICY_PARAMETER_INVALID_TYPE, value.getType());
- }
- }
- return map;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 95526f4..2380ab6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -87,6 +87,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -338,7 +339,8 @@ public class MetadataBootstrap {
new AsterixVirtualBufferCacheProvider(datasetId),
storageComponentProvider.getIoOperationSchedulerProvider(),
appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
- bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null);
+ bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null,
+ NoOpCompressorDecompressorFactory.INSTANCE);
DatasetLocalResourceFactory dsLocalResourceFactory =
new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same code path as
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index ba1ea03..d9309d9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -163,6 +163,11 @@ public final class MetadataRecordTypes {
public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 11;
public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 12;
public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 13;
+ //Optional open fields
+ public static final String DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME =
+ "BlockLevelStorageCompression";
+ public static final String DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME = "DatasetCompressionScheme";
+ public static final String DATASET_ARECORD_REBALANCE_FIELD_NAME = "rebalanceCount";
public static final ARecordType DATASET_RECORDTYPE = createRecordType(
// RecordTypeName
RECORD_NAME_DATASET,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index f5cfbb3..62a03ad 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -31,6 +31,9 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
*/
public class DatasetHints {
+ private DatasetHints() {
+ }
+
/**
* validate the use of a hint
*
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 301aafb..2a1d551 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
@@ -50,6 +51,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
@@ -93,11 +95,20 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
case INTERNAL:
AsterixVirtualBufferCacheProvider vbcProvider =
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
+
+ final ICompressorDecompressorFactory compDecompFactory;
+ if (index.isPrimaryIndex()) {
+ //Compress only primary index
+ compDecompFactory = mdProvider.getCompressionManager().getFactory(dataset.getCompressionScheme());
+ } else {
+ compDecompFactory = NoOpCompressorDecompressorFactory.INSTANCE;
+ }
+
return new LSMBTreeLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits,
filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory,
mergePolicyProperties, true, bloomFilterFields, bloomFilterFalsePositiveRate,
- index.isPrimaryIndex(), btreeFields);
+ index.isPrimaryIndex(), btreeFields, compDecompFactory);
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
dataset.getDatasetType().toString());
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e212d11..85beb95 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -37,6 +37,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -1616,4 +1617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
public ITxnIdFactory getTxnIdFactory() {
return appCtx.getTxnIdFactory();
}
+
+ public ICompressionManager getCompressionManager() {
+ return appCtx.getCompressionManager();
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index a25ed20..855cf78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -25,8 +25,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.IntStream;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveNotificationHandler;
import org.apache.asterix.common.api.IDatasetInfoProvider;
import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -65,7 +63,7 @@ import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -149,6 +147,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
private final String metaTypeName;
private final long rebalanceCount;
private int pendingOp;
+ private final String compressionScheme;
public Dataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName,
String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties,
@@ -156,29 +155,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
int pendingOp) {
this(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, /*metaTypeDataverseName*/null,
/*metaTypeName*/null, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails,
- hints, datasetType, datasetId, pendingOp);
+ hints, datasetType, datasetId, pendingOp, CompressionManager.NONE);
}
public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
- DatasetType datasetType, int datasetId, int pendingOp) {
+ DatasetType datasetType, int datasetId, int pendingOp, String compressionScheme) {
this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName,
metaItemTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints,
- datasetType, datasetId, pendingOp, 0L);
+ datasetType, datasetId, pendingOp, 0L, compressionScheme);
}
public Dataset(Dataset dataset) {
this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName,
dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails,
- dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount);
+ dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount,
+ dataset.compressionScheme);
}
public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
- DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount) {
+ DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount, String compressionScheme) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.recordTypeName = itemTypeName;
@@ -194,6 +194,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
this.pendingOp = pendingOp;
this.hints = hints;
this.rebalanceCount = rebalanceCount;
+ this.compressionScheme = compressionScheme;
}
@Override
@@ -357,7 +358,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(),
getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(),
getCompactionPolicy(), getCompactionPolicyProperties(), getDatasetDetails(), getHints(),
- getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP));
+ getDatasetType(), getDatasetId(), MetadataUtil.PENDING_DROP_OP, getCompressionScheme()));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
bActiveTxn.setValue(false);
@@ -644,6 +645,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
tree.put("metaTypeDataverseName", metaTypeDataverseName);
tree.put("metaTypeName", metaTypeName);
tree.put("pendingOp", MetadataUtil.pendingOpToString(pendingOp));
+ tree.put("rebalanceCount", rebalanceCount);
+ tree.put("compressionScheme", compressionScheme);
return tree;
}
@@ -823,7 +826,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
return new Dataset(this.dataverseName, this.datasetName, this.recordTypeDataverseName, this.recordTypeName,
this.metaTypeDataverseName, this.metaTypeName, targetNodeGroupName, this.compactionPolicyFactory,
this.compactionPolicyProperties, this.datasetDetails, this.hints, this.datasetType,
- DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1);
+ DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), this.pendingOp, this.rebalanceCount + 1,
+ this.compressionScheme);
}
// Gets an array of partition numbers for this dataset.
@@ -840,4 +844,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
public String getFullyQualifiedName() {
return dataverseName + '.' + datasetName;
}
+
+ public String getCompressionScheme() {
+ return compressionScheme;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index d5e179b..27978ab 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -60,10 +60,13 @@ import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.AUnorderedList;
import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.AUnorderedListType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -77,7 +80,6 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
private static final long serialVersionUID = 1L;
// Payload field containing serialized Dataset.
public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
- private static final String REBALANCE_ID_FIELD_NAME = "rebalanceCount";
@SuppressWarnings("unchecked")
protected final ISerializerDeserializer<ARecord> recordSerDes =
@@ -256,14 +258,33 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
metaTypeName = ((AString) datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue();
}
- // Read the rebalance count if there is one.
- int rebalanceCountIndex = datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME);
- long rebalanceCount = rebalanceCountIndex >= 0
- ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() : 0;
+ long rebalanceCount = getRebalanceCount(datasetRecord);
+ String compressionScheme = getCompressionScheme(datasetRecord);
return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
- datasetId, pendingOp, rebalanceCount);
+ datasetId, pendingOp, rebalanceCount, compressionScheme);
+ }
+
+ private long getRebalanceCount(ARecord datasetRecord) {
+ // Read the rebalance count if there is one.
+ int rebalanceCountIndex =
+ datasetRecord.getType().getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME);
+ return rebalanceCountIndex >= 0 ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue()
+ : 0;
+ }
+
+ private String getCompressionScheme(ARecord datasetRecord) {
+ final ARecordType datasetType = datasetRecord.getType();
+ final int compressionIndex = datasetType
+ .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME);
+ if (compressionIndex >= 0) {
+ final ARecordType compressionType = (ARecordType) datasetType.getFieldTypes()[compressionIndex];
+ final int schemeIndex = compressionType
+ .getFieldIndex(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME);
+ return ((AString) datasetRecord.getValueByPos(schemeIndex)).getStringValue();
+ }
+ return CompressionManager.NONE;
}
@Override
@@ -392,8 +413,19 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
return tuple;
}
+ /**
+ * Keep protected to allow other extensions to add additional fields
+ *
+ * @param dataset
+ * @throws HyracksDataException
+ */
protected void writeOpenFields(Dataset dataset) throws HyracksDataException {
- // write open fields
+ writeMetaPart(dataset);
+ writeRebalanceCount(dataset);
+ writeBlockLevelStorageCompression(dataset);
+ }
+
+ private void writeMetaPart(Dataset dataset) throws HyracksDataException {
if (dataset.hasMetaPart()) {
// write open field 1, the meta item type Dataverse name.
fieldName.reset();
@@ -413,10 +445,35 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(fieldName, fieldValue);
}
+ }
+
+ private void writeBlockLevelStorageCompression(Dataset dataset) throws HyracksDataException {
+ if (CompressionManager.NONE.equals(dataset.getCompressionScheme())) {
+ return;
+ }
+ RecordBuilder compressionObject = new RecordBuilder();
+ compressionObject.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ fieldName.reset();
+ aString.setValue(MetadataRecordTypes.DATASET_ARECORD_DATASET_COMPRESSION_SCHEME_FIELD_NAME);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ aString.setValue(dataset.getCompressionScheme());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ compressionObject.addField(fieldName, fieldValue);
+
+ fieldName.reset();
+ aString.setValue(MetadataRecordTypes.DATASET_ARECORD_BLOCK_LEVEL_STORAGE_COMPRESSION_FIELD_NAME);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ compressionObject.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+
+ private void writeRebalanceCount(Dataset dataset) throws HyracksDataException {
if (dataset.getRebalanceCount() > 0) {
// Adds the field rebalanceCount.
fieldName.reset();
- aString.setValue("rebalanceCount");
+ aString.setValue(MetadataRecordTypes.DATASET_ARECORD_REBALANCE_FIELD_NAME);
stringSerde.serialize(aString, fieldName.getDataOutput());
fieldValue.reset();
aBigInt.setValue(dataset.getRebalanceCount());
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index b87ef2a..902ee41 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.junit.Assert;
@@ -51,9 +51,9 @@ public class DatasetTupleTranslatorTest {
indicator == null ? null : Collections.singletonList(indicator),
Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
- Dataset dataset =
- new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix",
- compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0);
+ Dataset dataset = new Dataset("test", "log", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES",
+ "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0,
+ CompressionManager.NONE);
DatasetTupleTranslator dtTranslator = new DatasetTupleTranslator(true);
ITupleReference tuple = dtTranslator.getTupleFromMetadataEntity(dataset);
Dataset deserializedDataset = dtTranslator.getMetadataEntityFromTuple(tuple);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index ab4229c..7080dee 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Datatype;
@@ -41,6 +40,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.junit.Assert;
@@ -62,9 +62,9 @@ public class IndexTupleTranslatorTest {
indicator == null ? null : Collections.singletonList(indicator),
Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
- Dataset dataset =
- new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES", "prefix",
- compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0);
+ Dataset dataset = new Dataset("test", "d1", "foo", "LogType", "CB", "MetaType", "DEFAULT_NG_ALL_NODES",
+ "prefix", compactionPolicyProperties, details, Collections.emptyMap(), DatasetType.INTERNAL, 115, 0,
+ CompressionManager.NONE);
Index index = new Index("test", "d1", "i1", IndexType.BTREE,
Collections.singletonList(Collections.singletonList("row_id")),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
index 99b7176..6d2658b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
@@ -153,6 +153,14 @@ public class AdmObjectNode implements IAdmNode {
return getString(this, field);
}
+ public String getOptionalString(String field) {
+ final IAdmNode node = get(field);
+ if (node == null) {
+ return null;
+ }
+ return ((AdmStringNode) node).get();
+ }
+
public static String getString(AdmObjectNode openFields, String field) throws HyracksDataException {
IAdmNode node = openFields.get(field);
if (node == null) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
new file mode 100644
index 0000000..3bffa9a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/compression/CompressionManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.compression;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.storage.ICompressionManager;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressorFactory;
+
+public class CompressionManager implements ICompressionManager {
+ private static final Map<String, Class<? extends ICompressorDecompressorFactory>> REGISTERED_SCHEMES =
+ getRegisteredSchemes();
+ public static final String NONE = "none";
+ private final String defaultScheme;
+
+ /*
+ * New compression schemes can be added by registering the name and the factory class
+ *
+ * WARNING: Changing scheme name will breakdown storage back compatibility. Before upgrading to a newer
+ * version of the registered schemes, make sure it is also back-compatible with the previous version.
+ */
+ private static Map<String, Class<? extends ICompressorDecompressorFactory>> getRegisteredSchemes() {
+ final Map<String, Class<? extends ICompressorDecompressorFactory>> registeredSchemes = new HashMap<>();
+ //No compression
+ registeredSchemes.put(NONE, NoOpCompressorDecompressorFactory.class);
+ registeredSchemes.put("snappy", SnappyCompressorDecompressorFactory.class);
+ return registeredSchemes;
+ }
+
+ public CompressionManager(StorageProperties storageProperties) {
+ validateCompressionConfiguration(storageProperties);
+ defaultScheme = storageProperties.getCompressionScheme();
+ }
+
+ @Override
+ public ICompressorDecompressorFactory getFactory(String schemeName) throws CompilationException {
+ final String scheme = getDdlOrDefaultCompressionScheme(schemeName);
+ Class<? extends ICompressorDecompressorFactory> clazz = REGISTERED_SCHEMES.get(scheme);
+ try {
+ return clazz.newInstance();
+ } catch (IllegalAccessException | InstantiationException e) {
+ throw new IllegalStateException("Failed to instantiate compressor/decompressor: " + scheme, e);
+ }
+ }
+
+ @Override
+ public String getDdlOrDefaultCompressionScheme(String ddlScheme) throws CompilationException {
+ if (ddlScheme != null && !isRegisteredScheme(ddlScheme)) {
+ throw new CompilationException(ErrorCode.UNKNOWN_COMPRESSION_SCHEME, ddlScheme, formatSupportedValues());
+ }
+
+ return ddlScheme != null ? ddlScheme : defaultScheme;
+ }
+
+ /**
+ * Register factory classes for persisted resources
+ *
+ * @param registeredClasses
+ */
+ public static void registerCompressorDecompressorsFactoryClasses(
+ Map<String, Class<? extends IJsonSerializable>> registeredClasses) {
+ for (Class<? extends ICompressorDecompressorFactory> clazz : REGISTERED_SCHEMES.values()) {
+ registeredClasses.put(clazz.getSimpleName(), clazz);
+ }
+ }
+
+ /**
+ * @param schemeName
+ * Compression scheme name
+ * @return
+ * true if it is registered
+ */
+ private boolean isRegisteredScheme(String schemeName) {
+ return schemeName != null && REGISTERED_SCHEMES.containsKey(schemeName.toLowerCase());
+ }
+
+ /**
+ * Validate the configuration of StorageProperties
+ *
+ * @param storageProperties
+ */
+ private void validateCompressionConfiguration(StorageProperties storageProperties) {
+ if (!isRegisteredScheme(storageProperties.getCompressionScheme())) {
+ final String option = StorageProperties.Option.STORAGE_COMPRESSION_BLOCK.ini();
+ final String value = storageProperties.getCompressionScheme();
+ throw new IllegalStateException("Invalid compression configuration (" + option + " = " + value
+ + "). Valid values are: " + formatSupportedValues());
+ }
+
+ }
+
+ private String formatSupportedValues() {
+ final StringBuilder schemes = new StringBuilder();
+ final Iterator<String> iterator = REGISTERED_SCHEMES.keySet().iterator();
+ schemes.append('[');
+ schemes.append(iterator.next());
+ while (iterator.hasNext()) {
+ schemes.append(',');
+ schemes.append(iterator.next());
+ }
+ schemes.append(']');
+ return schemes.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 4157e16..0d2a1df 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@ import java.util.function.Supplier;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ActiveProperties;
@@ -44,7 +43,10 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
+import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -87,6 +89,7 @@ public class CcApplicationContext implements ICcApplicationContext {
private IClusterStateManager clusterStateManager;
private final INodeJobTracker nodeJobTracker;
private final ITxnIdFactory txnIdFactory;
+ private final ICompressionManager compressionManager;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
@@ -121,6 +124,7 @@ public class CcApplicationContext implements ICcApplicationContext {
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
nodeJobTracker = new NodeJobTracker();
txnIdFactory = new BulkTxnIdFactory();
+ compressionManager = new CompressionManager(storageProperties);
}
@@ -270,7 +274,13 @@ public class CcApplicationContext implements ICcApplicationContext {
return NoOpCoordinationService.INSTANCE;
}
+ @Override
public ITxnIdFactory getTxnIdFactory() {
return txnIdFactory;
}
+
+ @Override
+ public ICompressionManager getCompressionManager() {
+ return compressionManager;
+ }
}