You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:31:28 UTC
[09/13] incubator-asterixdb git commit: Add Support for Upsert
Operation
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 14a3e2a..d523c6e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -636,11 +636,11 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
token = admLexer.next();
this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
if (openRecordField) {
- if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+ if (fieldValueBuffer.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
}
} else if (NonTaggedFormatUtil.isOptional(recType)) {
- if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+ if (fieldValueBuffer.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
recBuilder.addField(fieldId, fieldValueBuffer);
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/javacc/AQL.jj b/asterix-lang-aql/src/main/javacc/AQL.jj
index 8f62f74..93e3f68 100644
--- a/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -124,6 +124,7 @@ import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.statement.TypeDecl;
import org.apache.asterix.lang.common.statement.TypeDropStatement;
import org.apache.asterix.lang.common.statement.UpdateStatement;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
import org.apache.asterix.lang.common.statement.WriteStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.QuantifiedPair;
@@ -884,12 +885,17 @@ InsertStatement InsertStatement() throws ParseException:
{
Pair<Identifier,Identifier> nameComponents = null;
Query query;
+ boolean upsert = false;
}
{
- "insert" "into" <DATASET> nameComponents = QualifiedName() query = Query()
+ ("insert"|"upsert"{ upsert = true; }) "into" <DATASET> nameComponents = QualifiedName() query = Query()
{
query.setTopLevel(true);
- return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+ if(upsert){
+ return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+ } else{
+ return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 7cf12c7..3184d1e 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -26,6 +26,7 @@ public interface Statement extends ILangExpression {
DATASET_DROP,
DELETE,
INSERT,
+ UPSERT,
UPDATE,
DML_CMD_LIST,
FUNCTION_DECL,
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
new file mode 100644
index 0000000..f415951
--- /dev/null
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.lang.common.struct.Identifier;
+
+public class UpsertStatement extends InsertStatement {
+
+ public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
+ super(dataverseName, datasetName, query, varCounter);
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.UPSERT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f15540a..f3523da 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -101,6 +101,9 @@ import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -110,6 +113,7 @@ import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOpera
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -1184,7 +1188,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
+ fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
@@ -1670,7 +1675,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, !temp),
- filterFactory, modificationCallbackFactory, false, indexName);
+ filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -2029,7 +2035,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
filterTypeTraits, filterCmpFactories, filterFields, !temp),
- filterFactory, modificationCallbackFactory, false, indexName);
+ filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
@@ -2237,4 +2244,721 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
throw new AlgebricksException(e);
}
}
+
+ //TODO: refactor this method
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+ LogicalVariable prevPayload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException(
+ "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ int numKeys = primaryKeys.size();
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+ // Move key fields to front. {keys, record, filters}
+ int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+ int[] bloomFilterKeyFields = new int[numKeys];
+ int i = 0;
+ // set the keys' permutations
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ // set the record permutation
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ // set the filters' permutations.
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(filterKeys.get(0));
+ fieldPermutation[numKeys + 1] = idx;
+ }
+
+ try {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ String indexName = primaryIndex.getIndexName();
+
+ String itemTypeName = dataset.getItemTypeName();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataSource.getId().getDataverseName(), itemTypeName).getDatatype();
+
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numKeys];
+ for (i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+ : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
+ IndexOperation.UPSERT, ResourceType.LSM_BTREE);
+
+ LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
+ AsterixLSMTreeUpsertOperatorDescriptor op;
+
+ ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + 1 + numFilterFields];
+ ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + 1
+ + numFilterFields];
+ for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
+ outputSerDes[j] = recordDesc.getFields()[j];
+ }
+ outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat().getSerdeProvider()
+ .getSerializerDeserializer(itemType);
+ outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+ .getTypeTraitProvider().getTypeTrait(itemType);
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField = DatasetUtils.getFilterField(dataset).get(0);
+ for (i = 0; i < itemType.getFieldNames().length; i++) {
+ if (itemType.getFieldNames()[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+ .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ }
+
+ RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+ op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ idfh, null, true, indexName, context.getNullWriterFactory(), modificationCallbackFactory,
+ searchCallbackFactory, null);
+ op.setType(itemType);
+ op.setFilterIndex(fieldIdx);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ // TODO refactor this method
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+ ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ String indexName = dataSourceIndex.getId();
+ String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
+ String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
+
+ Dataset dataset = findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ Index secondaryIndex;
+ try {
+ secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+ ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
+ if (prevAdditionalFilteringKey != null) {
+ prevAdditionalFilteringKeys = new ArrayList<LogicalVariable>();
+ prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
+ }
+ switch (secondaryIndex.getIndexType()) {
+ case BTREE: {
+ return getBTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+ prevSecondaryKeys, prevAdditionalFilteringKeys);
+ }
+ case RTREE: {
+ return getRTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+ prevSecondaryKeys, prevAdditionalFilteringKeys);
+ }
+ case SINGLE_PARTITION_WORD_INVIX:
+ case SINGLE_PARTITION_NGRAM_INVIX:
+ case LENGTH_PARTITIONED_WORD_INVIX:
+ case LENGTH_PARTITIONED_NGRAM_INVIX: {
+ return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+ secondaryIndex.getIndexType(), prevSecondaryKeys, prevAdditionalFilteringKeys);
+ }
+ default: {
+ throw new AlgebricksException(
+ "upsert is not implemented for index type: " + secondaryIndex.getIndexType());
+ }
+ }
+ }
+
+ //TODO: refactor this method
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexUpsertRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexType indexType,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
+ // Check the index is length-partitioned or not.
+ boolean isPartitioned;
+ if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+ || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+ isPartitioned = true;
+ } else {
+ isPartitioned = false;
+ }
+
+ // Sanity checks.
+ if (primaryKeys.size() > 1) {
+ throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
+ }
+ // The size of secondaryKeys can be two if it receives input from its
+ // TokenizeOperator- [token, number of token]
+ if (secondaryKeys.size() > 1 && !isPartitioned) {
+ throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
+ } else if (secondaryKeys.size() > 2 && isPartitioned) {
+ throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
+ }
+
+ Dataset dataset = findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ // For tokenization, sorting and loading.
+ // One token (+ optional partitioning field) + primary keys: [token,
+ // number of token, PK]
+ int numKeys = primaryKeys.size() + secondaryKeys.size();
+ int numTokenKeyPairFields = (!isPartitioned) ? 1 + primaryKeys.size() : 2 + primaryKeys.size();
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+ // generate field permutations
+ int[] fieldPermutation = new int[numKeys + numFilterFields];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int i = 0;
+ int j = 0;
+
+ // If the index is partitioned: [token, number of token]
+ // Otherwise: [token]
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
+ i++;
+ j++;
+ }
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+ fieldPermutation[numKeys] = idx;
+ }
+
+ // Find permutations for prev value
+ int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+ i = 0;
+
+ // If the index is partitioned: [token, number of token]
+ // Otherwise: [token]
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[i] = idx;
+ i++;
+ }
+
+ for (int k = 0; k < primaryKeys.size(); k++) {
+ prevFieldPermutation[k + i] = fieldPermutation[k + i];
+ i++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
+
+ String itemTypeName = dataset.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+ .getDatatype();
+
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+
+ ARecordType recType = (ARecordType) itemType;
+
+ // Index parameters.
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+
+ List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+
+ int numTokenFields = 0;
+
+ // SecondaryKeys.size() can be two if it comes from the bulkload.
+ // In this case, [token, number of token] are the secondaryKeys.
+ if (!isPartitioned || secondaryKeys.size() > 1) {
+ numTokenFields = secondaryKeys.size();
+ } else if (isPartitioned && secondaryKeys.size() == 1) {
+ numTokenFields = secondaryKeys.size() + 1;
+ }
+
+ ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
+ ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+ IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+ IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ dataset, recType, context.getBinaryComparatorFactoryProvider());
+
+ IAType secondaryKeyType = null;
+
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+ secondaryKeyExprs.get(0), recType);
+ secondaryKeyType = keyPairType.first;
+
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+
+ i = 0;
+ for (List<String> partitioningKey : partitioningKeys) {
+ IAType keyType = recType.getSubFieldType(partitioningKey);
+ invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+
+ tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+ tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+ if (isPartitioned) {
+ // The partitioning field is hardcoded to be a short *without*
+ // an Asterix type tag.
+ tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+ tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+ }
+ IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
+ secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
+
+ int[] filterFields = null;
+ int[] invertedIndexFields = null;
+ int[] filterFieldsForNonBulkLoadOps = null;
+ int[] invertedIndexFieldsForNonBulkLoadOps = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numTokenFields + primaryKeys.size();
+ invertedIndexFields = new int[numTokenFields + primaryKeys.size()];
+ for (int k = 0; k < invertedIndexFields.length; k++) {
+ invertedIndexFields[k] = k;
+ }
+
+ filterFieldsForNonBulkLoadOps = new int[numFilterFields];
+ filterFieldsForNonBulkLoadOps[0] = numTokenKeyPairFields;
+ invertedIndexFieldsForNonBulkLoadOps = new int[numTokenKeyPairFields];
+ for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
+ invertedIndexFieldsForNonBulkLoadOps[k] = k;
+ }
+ }
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+ dataverseName, datasetName, indexName, temp);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_INVERTED_INDEX)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_INVERTED_INDEX);
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory indexDataFlowFactory;
+ if (!isPartitioned) {
+ indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ } else {
+ indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ }
+ IOperatorDescriptor op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, prevFieldPermutation);
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ //TODO: refactor this method
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeUpsertRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
+ try {
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ String itemTypeName = dataset.getItemTypeName();
+ IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+
+ List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+ secondaryKeyExprs.get(0), recType);
+ IAType spatialType = keyPairType.first;
+
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ int numSecondaryKeys = dimension * 2;
+ int numPrimaryKeys = primaryKeys.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+ int[] fieldPermutation = new int[numKeys + numFilterFields];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int i = 0;
+ int j = 0;
+
+ // Get field permutation for new value
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
+ i++;
+ j++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+ fieldPermutation[numKeys] = idx;
+ }
+
+ // Get field permutation for previous value
+ int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+ i = 0;
+
+ // Get field permutation for new value
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[i] = idx;
+ i++;
+ }
+ for (int k = 0; k < numPrimaryKeys; k++) {
+ prevFieldPermutation[k + i] = fieldPermutation[k + i];
+ i++;
+ }
+
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
+
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+ IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(nestedKeyType, true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ for (List<String> partitioningKey : partitioningKeys) {
+ IAType keyType = recType.getSubFieldType(partitioningKey);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+
+ IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ dataset, recType, context.getBinaryComparatorFactoryProvider());
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+ dataverseName, datasetName, indexName, temp);
+ int[] btreeFields = new int[primaryComparatorFactories.length];
+ for (int k = 0; k < btreeFields.length; k++) {
+ btreeFields[k] = k + numSecondaryKeys;
+ }
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = null;
+ int[] rtreeFields = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numSecondaryKeys + numPrimaryKeys;
+ rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int k = 0; k < rtreeFields.length; k++) {
+ rtreeFields[k] = k;
+ }
+ }
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_RTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_RTREE);
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+ storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
+ filterTypeTraits, filterCmpFactories, filterFields, !temp),
+ filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ } catch (MetadataException | IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ //TODO: refactor this method
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeUpsertRuntime(String dataverseName,
+ String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
+ // we start with the btree
+ Dataset dataset = findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+ int numKeys = primaryKeys.size() + secondaryKeys.size();
+ int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+ // generate field permutations
+ int[] fieldPermutation = new int[numKeys + numFilterFields];
+ int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+ int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int i = 0;
+ int j = 0;
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ modificationCallbackPrimaryKeyFields[j] = i;
+ i++;
+ j++;
+ }
+ // Filter can only be one field!
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+ fieldPermutation[numKeys] = idx;
+ }
+
+ // generate field permutations for prev record
+ int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+ int k = 0;
+ for (LogicalVariable varKey : prevSecondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[k] = idx;
+ k++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ prevFieldPermutation[k] = idx;
+ k++;
+ }
+ // Filter can only be one field!
+ if (numFilterFields > 0) {
+ int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+ prevFieldPermutation[numKeys] = idx;
+ }
+
+ String itemTypeName = dataset.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+ .getDatatype();
+
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+
+ ARecordType recType = (ARecordType) itemType;
+
+ // Index parameters.
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = null;
+ int[] btreeFields = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numKeys;
+ btreeFields = new int[numKeys];
+ for (int l = 0; l < btreeFields.length; l++) {
+ btreeFields[l] = l;
+ }
+ }
+
+ List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ for (i = 0; i < secondaryKeys.size(); ++i) {
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+ secondaryKeyNames.get(i), recType);
+ IAType keyType = keyPairType.first;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+ for (List<String> partitioningKey : partitioningKeys) {
+ IAType keyType = recType.getSubFieldType(partitioningKey);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+ dataverseName, datasetName, indexName, temp);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+ int datasetId = dataset.getDatasetId();
+ TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_BTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+ ResourceType.LSM_BTREE);
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
+ AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
+ NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 8a95f87..03fc4c0 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -391,7 +391,7 @@ public class MetadataLockManager {
releaseDataverseReadLock(dataverseName);
}
- public void insertDeleteBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+ public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
List<String> datasets) {
dataverses.add(dataverseName);
datasets.add(datasetFullyQualifiedName);
@@ -420,7 +420,7 @@ public class MetadataLockManager {
}
}
- public void insertDeleteEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+ public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
List<String> datasets) {
String previous = null;
for (int i = dataverses.size() - 1; i >= 0; i--) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index a74f0fa..2b05761 100644
--- a/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ b/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -32,9 +32,6 @@ import org.apache.hyracks.data.std.util.GrowableArray;
import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
public abstract class AbstractListBuilder implements IAsterixListBuilder {
-
- protected static final byte serNullTypeTag = ATypeTag.NULL.serialize();
-
protected final GrowableArray outputStorage;
protected final DataOutputStream outputStream;
protected final IntArrayList offsets;
@@ -80,13 +77,13 @@ public abstract class AbstractListBuilder implements IAsterixListBuilder {
@Override
public void addItem(IValueReference item) throws HyracksDataException {
try {
- if (!fixedSize && (item.getByteArray()[0] != serNullTypeTag || itemTypeTag == ATypeTag.ANY))
+ if (!fixedSize && (item.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG || itemTypeTag == ATypeTag.ANY))
this.offsets.add(outputStorage.getLength());
if (itemTypeTag == ATypeTag.ANY
- || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == serNullTypeTag)) {
+ || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)) {
this.numberOfItems++;
this.outputStream.write(item.getByteArray(), item.getStartOffset(), item.getLength());
- } else if (item.getByteArray()[0] != serNullTypeTag) {
+ } else if (item.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
this.numberOfItems++;
this.outputStream.write(item.getByteArray(), item.getStartOffset() + 1, item.getLength() - 1);
}
@@ -110,8 +107,8 @@ public abstract class AbstractListBuilder implements IAsterixListBuilder {
if (!fixedSize) {
offsetPosition += 8;
for (int i = 0; i < offsets.size(); i++) {
- SerializerDeserializerUtil.writeIntToByteArray(offsetArray, offsets.get(i) + metadataInfoSize
- + headerSize, offsetPosition);
+ SerializerDeserializerUtil.writeIntToByteArray(offsetArray,
+ offsets.get(i) + metadataInfoSize + headerSize, offsetPosition);
offsetPosition += 4;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java b/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
index e3ed6fb..a36238d 100644
--- a/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
+++ b/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
@@ -44,8 +44,6 @@ import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerD
public class RecordBuilder implements IARecordBuilder {
private final static int DEFAULT_NUM_OPEN_FIELDS = 10;
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
private final UTF8StringSerializerDeserializer utf8SerDer = new UTF8StringSerializerDeserializer();
private int openPartOffsetArraySize;
@@ -163,7 +161,7 @@ public class RecordBuilder implements IARecordBuilder {
// +1 because we do not store the value tag.
closedPartOutputStream.write(value.getByteArray(), value.getStartOffset() + 1, len);
numberOfClosedFields++;
- if (isNullable && value.getByteArray()[value.getStartOffset()] != SER_NULL_TYPE_TAG) {
+ if (isNullable && value.getByteArray()[value.getStartOffset()] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
}
}
@@ -173,7 +171,7 @@ public class RecordBuilder implements IARecordBuilder {
// We assume the tag is not included (closed field)
closedPartOutputStream.write(value, 0, value.length);
numberOfClosedFields++;
- if (isNullable && value[0] != SER_NULL_TYPE_TAG) {
+ if (isNullable && value[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
}
}
@@ -255,7 +253,7 @@ public class RecordBuilder implements IARecordBuilder {
// write the record header
if (writeTypeTag) {
- out.writeByte(RECORD_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
}
out.writeInt(recordLength);
if (isOpen) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
index ed19224..8fa881a 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
@@ -42,7 +42,7 @@ public class AqlNullWriterFactory implements INullWriterFactory {
@Override
public void writeNull(DataOutput out) throws HyracksDataException {
try {
- out.writeByte(ATypeTag.NULL.serialize());
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} catch (IOException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
index 6a24cf3..4ce9617 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
@@ -113,13 +113,13 @@ public class AObjectAscBinaryComparatorFactory implements IBinaryComparatorFacto
// Normally, comparing between NULL and non-NULL values should return UNKNOWN as the result.
// However, at this point, we assume that NULL check between two types is already done.
// Therefore, inside this method, we return an order between two values even if one value is NULL.
- if (b1[s1] == ATypeTag.NULL.serialize()) {
- if (b2[s2] == ATypeTag.NULL.serialize())
+ if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
return 0;
else
return -1;
} else {
- if (b2[s2] == ATypeTag.NULL.serialize())
+ if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
return 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
index 77b8b3a..8907bca 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
@@ -29,8 +29,8 @@ import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
import org.apache.hyracks.data.std.primitive.DoublePointable;
import org.apache.hyracks.data.std.primitive.FloatPointable;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringLowercasePointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
public class ListItemBinaryComparatorFactory implements IBinaryComparatorFactory {
@@ -84,13 +84,13 @@ public class ListItemBinaryComparatorFactory implements IBinaryComparatorFactory
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
- if (b1[s1] == ATypeTag.NULL.serialize()) {
- if (b2[s2] == ATypeTag.NULL.serialize())
+ if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
return 0;
else
return -1;
} else {
- if (b2[s2] == ATypeTag.NULL.serialize())
+ if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
return 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
index 6d09bf8..247e6fd 100644
--- a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
+++ b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
@@ -34,14 +34,12 @@ public class AqlBinaryBooleanInspectorImpl implements IBinaryBooleanInspector {
}
};
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
private AqlBinaryBooleanInspectorImpl() {
}
@Override
public boolean getBooleanValue(byte[] bytes, int offset, int length) {
- if (bytes[offset] == SER_NULL_TYPE_TAG)
+ if (bytes[offset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
return false;
/** check if the runtime type is boolean */
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
index e73f769..52248c8 100644
--- a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
+++ b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
@@ -35,10 +35,12 @@ public class AqlBinaryTokenizerFactoryProvider implements IBinaryTokenizerFactor
public static final AqlBinaryTokenizerFactoryProvider INSTANCE = new AqlBinaryTokenizerFactoryProvider();
private static final IBinaryTokenizerFactory aqlStringTokenizer = new DelimitedUTF8StringBinaryTokenizerFactory(
- true, true, new UTF8WordTokenFactory(ATypeTag.STRING.serialize(), ATypeTag.INT32.serialize()));
+ true, true,
+ new UTF8WordTokenFactory(ATypeTag.SERIALIZED_STRING_TYPE_TAG, ATypeTag.SERIALIZED_INT32_TYPE_TAG));
private static final IBinaryTokenizerFactory aqlHashingStringTokenizer = new DelimitedUTF8StringBinaryTokenizerFactory(
- true, true, new HashedUTF8WordTokenFactory(ATypeTag.INT32.serialize(), ATypeTag.INT32.serialize()));
+ true, true,
+ new HashedUTF8WordTokenFactory(ATypeTag.SERIALIZED_INT32_TYPE_TAG, ATypeTag.SERIALIZED_INT32_TYPE_TAG));
private static final IBinaryTokenizerFactory orderedListTokenizer = new AOrderedListBinaryTokenizerFactory(
new AListElementTokenFactory());
@@ -77,7 +79,8 @@ public class AqlBinaryTokenizerFactoryProvider implements IBinaryTokenizerFactor
return null;
} else {
return new NGramUTF8StringBinaryTokenizerFactory(gramLength, usePrePost, true, true,
- new UTF8NGramTokenFactory(ATypeTag.STRING.serialize(), ATypeTag.INT32.serialize()));
+ new UTF8NGramTokenFactory(ATypeTag.SERIALIZED_STRING_TYPE_TAG,
+ ATypeTag.SERIALIZED_INT32_TYPE_TAG));
}
}
case ORDEREDLIST: {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 03e3895..67b62d6 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -19,6 +19,11 @@
package org.apache.asterix.om.pointables;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -35,11 +40,6 @@ import org.apache.asterix.om.util.container.IObjectFactory;
import org.apache.hyracks.api.dataflow.value.INullWriter;
import org.apache.hyracks.util.string.UTF8StringWriter;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* This class interprets the binary data representation of a record. One can
* call getFieldNames, getFieldTypeTags and getFieldValues to get pointable
@@ -52,6 +52,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
* object pool based allocator, in order to have object reuse
*/
static IObjectFactory<IVisitablePointable, IAType> FACTORY = new IObjectFactory<IVisitablePointable, IAType>() {
+ @Override
public IVisitablePointable create(IAType type) {
return new ARecordVisitablePointable((ARecordType) type);
}
@@ -114,7 +115,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
// add type name Reference (including a astring type tag)
int nameStart = typeBos.size();
- typeDos.writeByte(ATypeTag.STRING.serialize());
+ typeDos.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
utf8Writer.writeUTF8(fieldNameStrs[i], typeDos);
int nameEnd = typeBos.size();
IVisitablePointable typeNameReference = AFlatValuePointable.FACTORY.create(null);
@@ -183,8 +184,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(inputRecType);
if (hasNullableFields) {
nullBitMapOffset = s;
- offsetArrayOffset = s
- + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
+ offsetArrayOffset = s + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
: numberOfSchemaFields / 8 + 1);
} else {
offsetArrayOffset = s;
@@ -238,7 +238,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
int fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, ATypeTag.STRING,
false);
int fnstart = dataBos.size();
- dataDos.writeByte(ATypeTag.STRING.serialize());
+ dataDos.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
dataDos.write(b, fieldOffset, fieldValueLength);
int fnend = dataBos.size();
IVisitablePointable fieldName = allocator.allocateEmpty();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
index d89ae6a..d632d70 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
@@ -19,6 +19,14 @@
package org.apache.asterix.om.pointables.cast;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.TypeException;
@@ -47,14 +55,6 @@ import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import org.apache.hyracks.util.string.UTF8StringWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* This class is to do the runtime type cast for a record. It is ONLY visible to
* ACastVisitor.
@@ -75,8 +75,8 @@ class ARecordCaster {
private final IVisitablePointable nullReference = allocator.allocateEmpty();
private final IVisitablePointable nullTypeTag = allocator.allocateEmpty();
- private final IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory.of(
- UTF8StringPointable.FACTORY).createBinaryComparator();
+ private final IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY).createBinaryComparator();
private final ByteArrayAccessibleOutputStream outputBos = new ByteArrayAccessibleOutputStream();
private final DataOutputStream outputDos = new DataOutputStream(outputBos);
@@ -107,7 +107,7 @@ class ARecordCaster {
int end = bos.size();
nullReference.set(bos.getByteArray(), start, end - start);
start = bos.size();
- dos.write(ATypeTag.NULL.serialize());
+ dos.write(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
end = bos.size();
nullTypeTag.set(bos.getByteArray(), start, end - start);
} catch (IOException e) {
@@ -115,8 +115,8 @@ class ARecordCaster {
}
}
- public void castRecord(ARecordVisitablePointable recordAccessor, IVisitablePointable resultAccessor, ARecordType reqType,
- ACastVisitor visitor) throws IOException, TypeException {
+ public void castRecord(ARecordVisitablePointable recordAccessor, IVisitablePointable resultAccessor,
+ ARecordType reqType, ACastVisitor visitor) throws IOException, TypeException {
List<IVisitablePointable> fieldNames = recordAccessor.getFieldNames();
List<IVisitablePointable> fieldTypeTags = recordAccessor.getFieldTypeTags();
List<IVisitablePointable> fieldValues = recordAccessor.getFieldValues();
@@ -214,15 +214,15 @@ class ARecordCaster {
IVisitablePointable reqFieldTypeTag = reqFieldTypeTags.get(reqFnPos);
if (fieldTypeTag.equals(reqFieldTypeTag) || (
// match the null type of optional field
- optionalFields[reqFnPos] && fieldTypeTag.equals(nullTypeTag))) {
+ optionalFields[reqFnPos] && fieldTypeTag.equals(nullTypeTag))) {
fieldPermutation[reqFnPos] = fnPos;
openFields[fnPos] = false;
} else {
// if mismatch, check whether input type can be promoted to the required type
- ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(fieldTypeTag
- .getByteArray()[fieldTypeTag.getStartOffset()]);
- ATypeTag requiredTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(reqFieldTypeTag
- .getByteArray()[reqFieldTypeTag.getStartOffset()]);
+ ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(fieldTypeTag.getByteArray()[fieldTypeTag.getStartOffset()]);
+ ATypeTag requiredTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(reqFieldTypeTag.getByteArray()[reqFieldTypeTag.getStartOffset()]);
if (ATypeHierarchy.canPromote(inputTypeTag, requiredTypeTag)
|| ATypeHierarchy.canDemote(inputTypeTag, requiredTypeTag)) {
@@ -255,8 +255,8 @@ class ARecordCaster {
//print the field type
IVisitablePointable fieldType = fieldTypeTags.get(i);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(fieldType.getByteArray()[fieldType
- .getStartOffset()]);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(fieldType.getByteArray()[fieldType.getStartOffset()]);
ps.print(typeTag);
//collect the output message
@@ -281,8 +281,8 @@ class ARecordCaster {
}
private void writeOutput(List<IVisitablePointable> fieldNames, List<IVisitablePointable> fieldTypeTags,
- List<IVisitablePointable> fieldValues, DataOutput output, ACastVisitor visitor) throws IOException,
- AsterixException {
+ List<IVisitablePointable> fieldValues, DataOutput output, ACastVisitor visitor)
+ throws IOException, AsterixException {
// reset the states of the record builder
recBuilder.reset(cachedReqType);
recBuilder.init();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index ee91aab..ac50312 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -216,10 +216,10 @@ public class ARecordPointable extends AbstractPointable {
// Closed field accessors.
// -----------------------
- public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException,
- AsterixException {
+ public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut)
+ throws IOException, AsterixException {
if (isClosedFieldNull(recordType, fieldId)) {
- dOut.writeByte(ATypeTag.NULL.serialize());
+ dOut.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else {
dOut.write(getClosedFieldTag(recordType, fieldId));
dOut.write(bytes, getClosedFieldOffset(recordType, fieldId), getClosedFieldSize(recordType, fieldId));
@@ -231,7 +231,7 @@ public class ARecordPointable extends AbstractPointable {
}
public void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
- dOut.writeByte(ATypeTag.STRING.serialize());
+ dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
utf8Writer.writeUTF8(getClosedFieldName(recordType, fieldId), dOut);
}
@@ -281,8 +281,8 @@ public class ARecordPointable extends AbstractPointable {
// Open field accessors.
// -----------------------
- public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException,
- AsterixException {
+ public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut)
+ throws IOException, AsterixException {
dOut.write(bytes, getOpenFieldValueOffset(recordType, fieldId), getOpenFieldValueSize(recordType, fieldId));
}
@@ -297,7 +297,7 @@ public class ARecordPointable extends AbstractPointable {
}
public void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
- dOut.writeByte(ATypeTag.STRING.serialize());
+ dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
dOut.write(bytes, getOpenFieldNameOffset(recordType, fieldId), getOpenFieldNameSize(recordType, fieldId));
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
index 0a341f0..6196c60 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
@@ -24,9 +24,7 @@ import java.util.List;
/**
* There is a unique tag for each primitive type and for each kind of
- * non-primitive type in the object model.
- *
- * @author Nicola
+ * non-primitive type in the object model
*/
public enum ATypeTag implements IEnumSerializer {
INT8(1),
@@ -69,19 +67,39 @@ public enum ATypeTag implements IEnumSerializer {
UUID(38),
SHORTWITHOUTTYPEINFO(40);
- private byte value;
-
- private ATypeTag(int value) {
- this.value = (byte) value;
- }
-
- @Override
- public byte serialize() {
- return value;
- }
-
+ /*
+ * Serialized Tags begin
+ */
+ public static final byte SERIALIZED_STRING_TYPE_TAG = STRING.serialize();
+ public static final byte SERIALIZED_NULL_TYPE_TAG = NULL.serialize();
+ public static final byte SERIALIZED_DOUBLE_TYPE_TAG = DOUBLE.serialize();
+ public static final byte SERIALIZED_RECORD_TYPE_TAG = RECORD.serialize();
+ public static final byte SERIALIZED_INT32_TYPE_TAG = INT32.serialize();
+ public static final byte SERIALIZED_ORDEREDLIST_TYPE_TAG = ORDEREDLIST.serialize();
+ public static final byte SERIALIZED_UNORDEREDLIST_TYPE_TAG = UNORDEREDLIST.serialize();
+ public static final byte SERIALIZED_POLYGON_TYPE_TAG = POLYGON.serialize();
+ public static final byte SERIALIZED_DATE_TYPE_TAG = DATE.serialize();
+ public static final byte SERIALIZED_TIME_TYPE_TAG = TIME.serialize();
+ public static final byte SERIALIZED_DATETIME_TYPE_TAG = DATETIME.serialize();
+ public static final byte SERIALIZED_SYSTEM_NULL_TYPE_TAG = SYSTEM_NULL.serialize();
+ public static final byte SERIALIZED_DURATION_TYPE_TAG = DURATION.serialize();
+ public static final byte SERIALIZED_DAY_TIME_DURATION_TYPE_TAG = DAYTIMEDURATION.serialize();
+ public static final byte SERIALIZED_POINT_TYPE_TAG = POINT.serialize();
+ public static final byte SERIALIZED_INTERVAL_TYPE_TAG = INTERVAL.serialize();
+ public static final byte SERIALIZED_CIRCLE_TYPE_TAG = CIRCLE.serialize();
+ public static final byte SERIALIZED_YEAR_MONTH_DURATION_TYPE_TAG = YEARMONTHDURATION.serialize();
+ public static final byte SERIALIZED_LINE_TYPE_TAG = LINE.serialize();
+ public static final byte SERIALIZED_RECTANGLE_TYPE_TAG = RECTANGLE.serialize();
+ public static final byte SERIALIZED_BOOLEAN_TYPE_TAG = BOOLEAN.serialize();
+ public static final byte SERIALIZED_INT8_TYPE_TAG = INT8.serialize();
+ public static final byte SERIALIZED_INT16_TYPE_TAG = INT16.serialize();
+ public static final byte SERIALIZED_INT64_TYPE_TAG = INT64.serialize();
+ public static final byte SERIALIZED_FLOAT_TYPE_TAG = FLOAT.serialize();
+ /*
+ * Serialized Tags end
+ */
public static final int TYPE_COUNT = ATypeTag.values().length;
-
+ private byte value;
public static final ATypeTag[] VALUE_TYPE_MAPPING;
static {
@@ -96,6 +114,15 @@ public enum ATypeTag implements IEnumSerializer {
VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
}
+ private ATypeTag(int value) {
+ this.value = (byte) value;
+ }
+
+ @Override
+ public byte serialize() {
+ return value;
+ }
+
public boolean isDerivedType() {
if (this == ATypeTag.RECORD || this == ATypeTag.ORDEREDLIST || this == ATypeTag.UNORDEREDLIST
|| this == ATypeTag.UNION)