You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:02 UTC
[14/51] [partial] incubator-asterixdb git commit: Change folder
structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
deleted file mode 100644
index ffecc8e..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.file;
-
-import java.util.List;
-
-import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
-import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Index;
-import edu.uci.ics.asterix.metadata.external.IndexingConstants;
-import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import edu.uci.ics.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.storage.common.file.LocalResource;
-
-public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelper {
-
- protected SecondaryBTreeOperationsHelper(PhysicalOptimizationConfig physOptConf,
- IAsterixPropertiesProvider propertiesProvider) {
- super(physOptConf, propertiesProvider);
- }
-
- @Override
- public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- ILocalResourceFactoryProvider localResourceFactoryProvider;
- IIndexDataflowHelperFactory indexDataflowHelperFactory;
- if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
- ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(secondaryTypeTraits,
- secondaryComparatorFactories, secondaryBloomFilterKeyFields, true, dataset.getDatasetId(),
- mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, filterCmpFactories,
- secondaryBTreeFields, secondaryFilterFields);
- localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
- LocalResource.LSMBTreeResource);
- // The index create operation should be persistent regardless of temp datasets or permanent dataset.
- indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- secondaryBTreeFields, secondaryFilterFields, true);
- } else {
- // External dataset local resource and dataflow helper
- int[] buddyBreeFields = new int[] { numSecondaryKeys };
- ILocalResourceMetadata localResourceMetadata = new ExternalBTreeWithBuddyLocalResourceMetadata(
- dataset.getDatasetId(), secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory,
- mergePolicyFactoryProperties, buddyBreeFields);
- localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
- LocalResource.ExternalBTreeWithBuddyResource);
- indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory,
- mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
- }
- TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
- secondaryBloomFilterKeyFields, indexDataflowHelperFactory, localResourceFactoryProvider,
- NoOpOperationCallbackFactory.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
- secondaryPartitionConstraint);
- spec.addRoot(secondaryIndexCreateOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-
- @Override
- public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- /*
- * In case of external data, this method is used to build loading jobs for both initial load on index creation
- * and transaction load on dataset referesh
- */
-
- // Create external indexing scan operator
- ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
-
- // Assign op.
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
- if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
- }
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, numSecondaryKeys);
-
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
-
- // Sort by secondary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- // Create secondary BTree bulk load op.
- AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp;
- ExternalBTreeWithBuddyDataflowHelperFactory dataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
- mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys },
- ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
- IOperatorDescriptor root;
- if (externalFiles != null) {
- // Transaction load
- secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, numSecondaryKeys, dataflowHelperFactory,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
- root = secondaryBulkLoadOp;
- } else {
- // Initial load
- secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys, dataflowHelperFactory,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
- AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
- new IPushRuntimeFactory[] { new SinkRuntimeFactory() },
- new RecordDescriptor[] { secondaryRecDesc });
- spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
- root = metaOp;
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
- spec.addRoot(root);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- } else {
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
-
- // Assign op.
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
- if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
- }
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys);
-
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
-
- // Sort by secondary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- boolean temp = dataset.getDatasetDetails().isTemp();
- // Create secondary BTree bulk load op.
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numSecondaryKeys,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
- .getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- secondaryBTreeFields, secondaryFilterFields, !temp), GlobalConfig.DEFAULT_TREE_FILL_FACTOR);
-
- AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
- new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] { secondaryRecDesc });
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
- spec.addRoot(metaOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
- }
-
- @Override
- protected int getNumSecondaryKeys() {
- return numSecondaryKeys;
- }
-
- @Override
- public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- boolean temp = dataset.getDatasetDetails().isTemp();
- LSMTreeIndexCompactOperatorDescriptor compactOp;
- if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory,
- mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, !temp),
- NoOpOperationCallbackFactory.INSTANCE);
- } else {
- // External dataset
- compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, secondaryBloomFilterKeyFields,
- new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, storageProperties
- .getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys },
- ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true),
- NoOpOperationCallbackFactory.INSTANCE);
- }
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
- secondaryPartitionConstraint);
- spec.addRoot(compactOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
- List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadataProvider)
- throws AlgebricksException, AsterixException {
- secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields];
- secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
- secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
- + numFilterFields];
- ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
- secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
- IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
- .getBinaryComparatorFactoryProvider();
- // Record column is 0 for external datasets, numPrimaryKeys for internal ones
- int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0;
- for (int i = 0; i < numSecondaryKeys; i++) {
- secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(i), recordColumn);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyFields.get(i), itemType);
- IAType keyType = keyTypePair.first;
- anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
- secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
- secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
- secondaryBloomFilterKeyFields[i] = i;
- }
- if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- // Add serializers and comparators for primary index fields.
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
- enforcedRecFields[i] = primaryRecDesc.getFields()[i];
- secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
- enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
- secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
- }
- } else {
- // Add serializers and comparators for RID fields.
- for (int i = 0; i < numPrimaryKeys; i++) {
- secondaryRecFields[numSecondaryKeys + i] = IndexingConstants.getSerializerDeserializer(i);
- enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i);
- secondaryTypeTraits[numSecondaryKeys + i] = IndexingConstants.getTypeTraits(i);
- enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i);
- secondaryComparatorFactories[numSecondaryKeys + i] = IndexingConstants.getComparatorFactory(i);
- }
- }
- enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
-
- if (numFilterFields > 0) {
- secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat()
- .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
- IAType type = keyTypePair.first;
- ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
- secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
- }
-
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
- enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
deleted file mode 100644
index 07c8bab..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
-import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
-import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.external.indexing.operators.ExternalIndexBulkModifyOperatorDescriptor;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.external.IndexingConstants;
-import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
-import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-@SuppressWarnings("rawtypes")
-// TODO: We should eventually have a hierarchy of classes that can create all
-// possible index job specs,
-// not just for creation.
-public abstract class SecondaryIndexOperationsHelper {
- protected final PhysicalOptimizationConfig physOptConf;
-
- protected int numPrimaryKeys;
- protected int numSecondaryKeys;
- protected AqlMetadataProvider metadataProvider;
- protected String dataverseName;
- protected String datasetName;
- protected Dataset dataset;
- protected ARecordType itemType;
- protected ISerializerDeserializer payloadSerde;
- protected IFileSplitProvider primaryFileSplitProvider;
- protected AlgebricksPartitionConstraint primaryPartitionConstraint;
- protected IFileSplitProvider secondaryFileSplitProvider;
- protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
- protected String secondaryIndexName;
- protected boolean anySecondaryKeyIsNullable = false;
- protected boolean isEnforcingKeyTypes = false;
-
- protected long numElementsHint;
- protected IBinaryComparatorFactory[] primaryComparatorFactories;
- protected int[] primaryBloomFilterKeyFields;
- protected RecordDescriptor primaryRecDesc;
- protected IBinaryComparatorFactory[] secondaryComparatorFactories;
- protected ITypeTraits[] secondaryTypeTraits;
- protected int[] secondaryBloomFilterKeyFields;
- protected RecordDescriptor secondaryRecDesc;
- protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
-
- protected IAsterixPropertiesProvider propertiesProvider;
- protected ILSMMergePolicyFactory mergePolicyFactory;
- protected Map<String, String> mergePolicyFactoryProperties;
- protected RecordDescriptor enforcedRecDesc;
- protected ARecordType enforcedItemType;
-
- protected int numFilterFields;
- protected List<String> filterFieldName;
- protected ITypeTraits[] filterTypeTraits;
- protected IBinaryComparatorFactory[] filterCmpFactories;
- protected int[] secondaryFilterFields;
- protected int[] primaryFilterFields;
- protected int[] primaryBTreeFields;
- protected int[] secondaryBTreeFields;
- protected List<ExternalFile> externalFiles;
-
- // Prevent public construction. Should be created via createIndexCreator().
- protected SecondaryIndexOperationsHelper(PhysicalOptimizationConfig physOptConf,
- IAsterixPropertiesProvider propertiesProvider) {
- this.physOptConf = physOptConf;
- this.propertiesProvider = propertiesProvider;
- }
-
- public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName,
- String datasetName, String indexName, List<List<String>> secondaryKeyFields,
- List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
- PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType)
- throws AsterixException, AlgebricksException {
- IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
- SecondaryIndexOperationsHelper indexOperationsHelper = null;
- switch (indexType) {
- case BTREE: {
- indexOperationsHelper = new SecondaryBTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
- break;
- }
- case RTREE: {
- indexOperationsHelper = new SecondaryRTreeOperationsHelper(physOptConf, asterixPropertiesProvider);
- break;
- }
- case SINGLE_PARTITION_WORD_INVIX:
- case SINGLE_PARTITION_NGRAM_INVIX:
- case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- indexOperationsHelper = new SecondaryInvertedIndexOperationsHelper(physOptConf,
- asterixPropertiesProvider);
- break;
- }
- default: {
- throw new AsterixException("Unknown Index Type: " + indexType);
- }
- }
- indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields,
- secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, enforcedType);
- return indexOperationsHelper;
- }
-
- public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
-
- public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
-
- public abstract JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException;
-
- protected void init(IndexType indexType, String dvn, String dsn, String in, List<List<String>> secondaryKeyFields,
- List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
- ARecordType aRecType, ARecordType enforcedType) throws AsterixException, AlgebricksException {
- this.metadataProvider = metadataProvider;
- dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn;
- datasetName = dsn;
- secondaryIndexName = in;
- isEnforcingKeyTypes = isEnforced;
- dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException("Unknown dataset " + datasetName);
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- itemType = aRecType;
- enforcedItemType = enforcedType;
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
- numSecondaryKeys = secondaryKeyFields.size();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp);
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
-
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- numPrimaryKeys = ExternalIndexingOperations.getRIDSize(dataset);
- } else {
- filterFieldName = DatasetUtils.getFilterField(dataset);
- if (filterFieldName != null) {
- numFilterFields = 1;
- } else {
- numFilterFields = 0;
- }
-
- numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
- primaryFileSplitProvider = primarySplitsAndConstraint.first;
- primaryPartitionConstraint = primarySplitsAndConstraint.second;
- setPrimaryRecDescAndComparators();
- }
- setSecondaryRecDescAndComparators(indexType, secondaryKeyFields, secondaryKeyTypes, gramLength,
- metadataProvider);
- numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
- metadataProvider.getMetadataTxnContext());
- mergePolicyFactory = compactionInfo.first;
- mergePolicyFactoryProperties = compactionInfo.second;
-
- if (numFilterFields > 0) {
- setFilterTypeTraitsAndComparators();
- }
- }
-
- protected void setFilterTypeTraitsAndComparators() throws AlgebricksException {
- filterTypeTraits = new ITypeTraits[numFilterFields];
- filterCmpFactories = new IBinaryComparatorFactory[numFilterFields];
- secondaryFilterFields = new int[numFilterFields];
- primaryFilterFields = new int[numFilterFields];
- primaryBTreeFields = new int[numPrimaryKeys + 1];
- secondaryBTreeFields = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < primaryBTreeFields.length; i++) {
- primaryBTreeFields[i] = i;
- }
- for (int i = 0; i < secondaryBTreeFields.length; i++) {
- secondaryBTreeFields[i] = i;
- }
-
- IAType type;
- try {
- type = itemType.getSubFieldType(filterFieldName);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- filterCmpFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
- filterTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
- secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
- primaryFilterFields[0] = numPrimaryKeys + 1;
- }
-
- protected abstract int getNumSecondaryKeys();
-
- protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- int numPrimaryKeys = partitioningKeys.size();
- ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
- primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- primaryBloomFilterKeyFields = new int[numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType;
- try {
- keyType = itemType.getSubFieldType(partitioningKeys.get(i));
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, true);
- primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- primaryBloomFilterKeyFields[i] = i;
- }
- primaryRecFields[numPrimaryKeys] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
- primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
- }
-
- protected abstract void setSecondaryRecDescAndComparators(IndexType indexType,
- List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes, int gramLength,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException;
-
- protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
- AlgebricksException {
- // Build dummy tuple containing one field with a dummy value inside.
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- try {
- // Serialize dummy value into a field.
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- // Add dummy field.
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
- primaryPartitionConstraint);
- return keyProviderOp;
- }
-
- protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
- // -Infinity
- int[] lowKeyFields = null;
- // +Infinity
- int[] highKeyFields = null;
- ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- JobId jobId = JobIdFactory.generateJobId();
- metadataProvider.setJobId(jobId);
- boolean isWriteTransaction = metadataProvider.isWriteTransaction();
- IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
- spec.setJobletEventListenerFactory(jobEventListenerFactory);
-
- boolean temp = dataset.getDatasetDetails().isTemp();
- ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(),
- primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
- primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- mergePolicyFactory, mergePolicyFactoryProperties, new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
- .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
- primaryBTreeFields, primaryFilterFields, !temp), false, false, null,
- searchCallbackFactory, null, null);
-
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
- primaryPartitionConstraint);
- return primarySearchOp;
- }
-
- protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
- AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
- int[] outColumns = new int[numSecondaryKeyFields + numFilterFields];
- int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
- for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) {
- outColumns[i] = numPrimaryKeys + i;
- }
- int projCount = 0;
- for (int i = 0; i < numSecondaryKeyFields; i++) {
- projectionList[projCount++] = numPrimaryKeys + i;
- }
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[projCount++] = i;
- }
- if (numFilterFields > 0) {
- projectionList[projCount++] = numPrimaryKeys + numSecondaryKeyFields;
- }
-
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
- for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
- sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
- secondaryFieldAccessEvalFactories[i]);
- }
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
- primaryPartitionConstraint);
- return asterixAssignOp;
- }
-
- protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec,
- AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, DatasetType dsType) {
- CastRecordDescriptor castFuncDesc = (CastRecordDescriptor) CastRecordDescriptor.FACTORY
- .createFunctionDescriptor();
- castFuncDesc.reset(enforcedItemType, itemType);
-
- int[] outColumns = new int[1];
- int[] projectionList = new int[1 + numPrimaryKeys];
- int recordIdx;
- //external datascan operator returns a record as the first field, instead of the last in internal case
- if (dsType == DatasetType.EXTERNAL) {
- recordIdx = 0;
- outColumns[0] = 0;
- } else {
- recordIdx = numPrimaryKeys;
- outColumns[0] = numPrimaryKeys;
- }
- for (int i = 0; i <= numPrimaryKeys; i++) {
- projectionList[i] = i;
- }
- ICopyEvaluatorFactory[] castEvalFact = new ICopyEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) };
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1];
- sefs[0] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
- castFuncDesc.createEvaluatorFactory(castEvalFact));
- AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- AlgebricksMetaOperatorDescriptor castRecAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { castAssign }, new RecordDescriptor[] { enforcedRecDesc });
-
- return castRecAssignOp;
- }
-
- protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
- IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
- int[] sortFields = new int[secondaryComparatorFactories.length];
- for (int i = 0; i < secondaryComparatorFactories.length; i++) {
- sortFields[i] = i;
- }
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
- return sortOp;
- }
-
- protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
- int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
- throws MetadataException, AlgebricksException {
- int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
- for (int i = 0; i < fieldPermutation.length; i++) {
- fieldPermutation[i] = i;
- }
- TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
- fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
- secondaryPartitionConstraint);
- return treeIndexBulkLoadOp;
- }
-
- public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
- throws AlgebricksException {
- ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
- NotDescriptor notDesc = new NotDescriptor();
- IsNullDescriptor isNullDesc = new IsNullDescriptor();
- for (int i = 0; i < numSecondaryKeyFields; i++) {
- // Access column i, and apply 'is not null'.
- ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
- ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
- .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
- ICopyEvaluatorFactory notEvalFactory = notDesc
- .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
- andArgsEvalFactories[i] = notEvalFactory;
- }
- ICopyEvaluatorFactory selectCond = null;
- if (numSecondaryKeyFields > 1) {
- // Create conjunctive condition where all secondary index keys must
- // satisfy 'is not null'.
- AndDescriptor andDesc = new AndDescriptor();
- selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
- } else {
- selectCond = andArgsEvalFactories[0];
- }
- StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
- new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
- null, AqlBinaryBooleanInspectorImpl.FACTORY, false, -1, null);
- AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
- primaryPartitionConstraint);
- return asterixSelectOp;
- }
-
- // This method creates a source indexing operator for external data
- protected ExternalDataScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec)
- throws AlgebricksException, AsterixException {
- // A record + primary keys
- ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys];
- ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys];
- // payload serde and type traits for the record slot
- serdes[0] = payloadSerde;
- typeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
- // serdes and type traits for rid fields
- for (int i = 1; i < serdes.length; i++) {
- serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1);
- typeTraits[i] = IndexingConstants.getTypeTraits(i - 1);
- }
- // output record desc
- RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits);
-
- // Create the operator and its partition constraits
- Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints;
- try {
- indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider,
- dataset, itemType, indexerDesc, externalFiles);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
- indexingOpAndConstraints.second);
-
- // Set the primary partition constraints to this partition constraints
- primaryPartitionConstraint = indexingOpAndConstraints.second;
- return indexingOpAndConstraints.first;
- }
-
- protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys)
- throws AlgebricksException {
- int[] outColumns = new int[numSecondaryKeys];
- int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeys; i++) {
- outColumns[i] = i + numPrimaryKeys + 1;
- projectionList[i] = i + numPrimaryKeys + 1;
- }
-
- IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
- for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
- sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
- secondaryFieldAccessEvalFactories[i]);
- }
- //add External RIDs to the projection list
- for (int i = 0; i < numPrimaryKeys; i++) {
- projectionList[numSecondaryKeys + i] = i + 1;
- }
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
- AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
- new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
- return asterixAssignOp;
- }
-
- protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec,
- int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
- throws MetadataException, AlgebricksException {
- int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
- for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
- fieldPermutation[i] = i;
- }
- // create a list of file ids
- int numOfDeletedFiles = 0;
- for (ExternalFile file : externalFiles) {
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP)
- numOfDeletedFiles++;
- }
- int[] deletedFiles = new int[numOfDeletedFiles];
- int i = 0;
- for (ExternalFile file : externalFiles) {
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
- deletedFiles[i] = file.getFileNumber();
- }
- }
- ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor(
- spec, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits,
- secondaryComparatorFactories, secondaryBloomFilterKeyFields, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE, deletedFiles, fieldPermutation, fillFactor, numElementsHint);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
- secondaryPartitionConstraint);
- return treeIndexBulkLoadOp;
- }
-
- public List<ExternalFile> getExternalFiles() {
- return externalFiles;
- }
-
- public void setExternalFiles(List<ExternalFile> externalFiles) {
- this.externalFiles = externalFiles;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java
deleted file mode 100644
index 74c4256..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.file;
-
-import java.util.List;
-
-import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
-import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Index;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
-import edu.uci.ics.asterix.runtime.formats.FormatUtils;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
-import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.storage.common.file.LocalResource;
-
-public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper {
-
- private IAType secondaryKeyType;
- private ITypeTraits[] invListsTypeTraits;
- private IBinaryComparatorFactory[] tokenComparatorFactories;
- private ITypeTraits[] tokenTypeTraits;
- private IBinaryTokenizerFactory tokenizerFactory;
- // For tokenization, sorting and loading. Represents <token, primary keys>.
- private int numTokenKeyPairFields;
- private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
- private RecordDescriptor tokenKeyPairRecDesc;
- private boolean isPartitioned;
- private int[] invertedIndexFields;
- private int[] invertedIndexFieldsForNonBulkLoadOps;
- private int[] secondaryFilterFieldsForNonBulkLoadOps;
-
- protected SecondaryInvertedIndexOperationsHelper(PhysicalOptimizationConfig physOptConf,
- IAsterixPropertiesProvider propertiesProvider) {
- super(physOptConf, propertiesProvider);
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields,
- List<IAType> secondaryKeyTypes, int gramLength, AqlMetadataProvider metadata) throws AlgebricksException,
- AsterixException {
- // Sanity checks.
- if (numPrimaryKeys > 1) {
- throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
- }
- if (numSecondaryKeys > 1) {
- throw new AsterixException("Cannot create composite inverted index on multiple fields.");
- }
- if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- isPartitioned = true;
- } else {
- isPartitioned = false;
- }
- // Prepare record descriptor used in the assign op, and the optional
- // select op.
- secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys + numFilterFields];
- ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys
- + numFilterFields];
- ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields];
- secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
- ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys];
- ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
- ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
- if (numSecondaryKeys > 0) {
- secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
- isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(0), numPrimaryKeys);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
- secondaryKeyFields.get(0), itemType);
- secondaryKeyType = keyTypePair.first;
- anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
- ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType);
- secondaryRecFields[0] = keySerde;
- secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType);
- }
- if (numFilterFields > 0) {
- secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat()
- .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys);
- Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
- IAType type = keyTypePair.first;
- ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
- secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
- }
- secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
- // Comparators and type traits for tokens.
- int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
- tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
- tokenTypeTraits = new ITypeTraits[numTokenFields];
- tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
- tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
- if (isPartitioned) {
- // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
- tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
- tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
- }
- // Set tokenizer factory.
- // TODO: We might want to expose the hashing option at the AQL level,
- // and add the choice to the index metadata.
- tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType,
- gramLength);
- // Type traits for inverted-list elements. Inverted lists contain
- // primary keys.
- invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
- if (numPrimaryKeys > 0) {
- invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
- enforcedRecFields[0] = primaryRecDesc.getFields()[0];
- enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0];
- }
- enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType);
- enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
- // For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys.
- numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
- ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields
- + numFilterFields];
- ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
- tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
- tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
- tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
- tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
- int pkOff = 1;
- if (isPartitioned) {
- tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
- tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
- tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
- pkOff = 2;
- }
- if (numPrimaryKeys > 0) {
- tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
- tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0];
- tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0];
- }
- if (numFilterFields > 0) {
- tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys];
- }
- tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
- if (filterFieldName != null) {
- invertedIndexFields = new int[numTokenKeyPairFields];
- for (int i = 0; i < invertedIndexFields.length; i++) {
- invertedIndexFields[i] = i;
- }
- secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields];
- secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
- invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
- for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
- invertedIndexFieldsForNonBulkLoadOps[i] = i;
- }
- }
-
- }
-
- @Override
- protected int getNumSecondaryKeys() {
- return numTokenKeyPairFields - numPrimaryKeys;
- }
-
- @Override
- public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
- ILocalResourceMetadata localResourceMetadata = new LSMInvertedIndexLocalResourceMetadata(invListsTypeTraits,
- primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned,
- dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits,
- filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps);
- ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
- localResourceMetadata, LocalResource.LSMInvertedIndexResource);
-
- IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
- LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
- localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
- secondaryPartitionConstraint);
- spec.addRoot(invIndexCreateOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-
- @Override
- public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- // Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-
- // Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
-
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
- if (isEnforcingKeyTypes) {
- sourceOp = createCastOp(spec, primaryScanOp, numSecondaryKeys, dataset.getDatasetType());
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);
- }
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, numSecondaryKeys);
-
- // If any of the secondary fields are nullable, then add a select op
- // that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
- }
-
- // Create a tokenizer op.
- AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
-
- // Sort by token + primary keys.
- ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
-
- // Create secondary inverted index bulk load op.
- LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
-
- AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
- new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {});
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
- }
- spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0);
- spec.addRoot(metaOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-
- private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
- int docField = 0;
- int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields];
- for (int i = 0; i < primaryKeyFields.length; i++) {
- primaryKeyFields[i] = numSecondaryKeys + i;
- }
- BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
- tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
- primaryPartitionConstraint);
- return tokenizerOp;
- }
-
- @Override
- protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
- IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
- // Sort on token and primary keys.
- int[] sortFields = new int[numTokenKeyPairFields];
- for (int i = 0; i < numTokenKeyPairFields; i++) {
- sortFields[i] = i;
- }
- ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
- physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
- return sortOp;
- }
-
- private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
- int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields];
- for (int i = 0; i < fieldPermutation.length; i++) {
- fieldPermutation[i] = i;
- }
- IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
- LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
- secondaryPartitionConstraint);
- return invIndexBulkLoadOp;
- }
-
- private IIndexDataflowHelperFactory createDataflowHelperFactory() {
- AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
- boolean temp = dataset.getDatasetDetails().isTemp();
- if (!isPartitioned) {
- return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- } else {
- return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- }
- }
-
- @Override
- public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
- IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
- LSMInvertedIndexCompactOperator compactOp = new LSMInvertedIndexCompactOperator(spec,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
- secondaryPartitionConstraint);
-
- spec.addRoot(compactOp);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- return spec;
- }
-}