You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/09/02 13:56:44 UTC
[2/3] asterixdb git commit: ASTERIXDB-1238: Refactor
AqlMetadataProvider
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/55a558f2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 72360b6..0975163 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,17 +68,14 @@ import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
@@ -123,11 +120,9 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -174,23 +169,27 @@ import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+ private final AsterixStorageProperties storageProperties;
+ private final ILibraryManager libraryManager;
+ private final Dataverse defaultDataverse;
+
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
- private final Map<String, String[]> stores;
private Map<String, String> config;
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private boolean asyncResults;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
-
- private final Dataverse defaultDataverse;
private JobId jobId;
private Map<String, Integer> locks;
private boolean isTemporaryDatasetWriteJob = true;
- private final AsterixStorageProperties storageProperties;
- private final ILibraryManager libraryManager;
+ public AqlMetadataProvider(Dataverse defaultDataverse) {
+ this.defaultDataverse = defaultDataverse;
+ this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+ }
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -200,21 +199,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
this.config = config;
}
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
public Map<String, String> getConfig() {
return config;
}
- public AqlMetadataProvider(Dataverse defaultDataverse) {
- this.defaultDataverse = defaultDataverse;
- this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
- this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
- }
-
public ILibraryManager getLibraryManager() {
return libraryManager;
}
@@ -283,43 +271,106 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return resultSerializerFactoryProvider;
}
+ public boolean isWriteTransaction() {
+ // The transaction writes persistent datasets.
+ return isWriteTransaction;
+ }
+
+ public boolean isTemporaryDatasetWriteJob() {
+ // The transaction only writes temporary datasets.
+ return isTemporaryDatasetWriteJob;
+ }
+
+ public IDataFormat getFormat() {
+ return FormatUtils.getDefaultFormat();
+ }
+
+ public AsterixStorageProperties getStorageProperties() {
+ return storageProperties;
+ }
+
+ public Map<String, Integer> getLocks() {
+ return locks;
+ }
+
+ public void setLocks(Map<String, Integer> locks) {
+ this.locks = locks;
+ }
+
/**
* Retrieve the Output RecordType, as defined by "set output-record-type".
*/
public ARecordType findOutputRecordType() throws AlgebricksException {
- String outputRecordType = getPropertyValue("output-record-type");
- if (outputRecordType == null) {
+ return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
+ getPropertyValue("output-record-type"));
+ }
+
+ public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
+ String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
+ : dataverse;
+ if (dv == null) {
return null;
}
- String dataverse = getDefaultDataverseName();
- if (dataverse == null) {
- throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
- }
- IAType type = findType(dataverse, outputRecordType);
- if (!(type instanceof ARecordType)) {
- throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
- }
- return (ARecordType) type;
+ return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
+ }
+
+ public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
+ return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+ }
+
+ public IAType findType(String dataverse, String typeName) throws AlgebricksException {
+ return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
+ }
+
+ public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
+ return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
+ }
+
+ public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+ return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
}
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
- AqlSourceId aqlId = id;
+ return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+ }
+
+ public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException {
+ return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+ }
+
+ @Override
+ public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+ throws AlgebricksException {
+ AqlDataSource ads = findDataSource(dataSourceId);
+ Dataset dataset = ((DatasetDataSource) ads).getDataset();
try {
- return lookupSourceInMetadata(aqlId);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
+ String indexName = indexId;
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName);
+ if (secondaryIndex != null) {
+ return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+ } else {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ if (primaryIndex.getIndexName().equals(indexId)) {
+ return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+ } else {
+ return null;
+ }
+ }
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
}
}
- public boolean isWriteTransaction() {
- // The transaction writes persistent datasets.
- return isWriteTransaction;
+ public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
+ return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
}
- public boolean isTemporaryDatasetWriteJob() {
- // The transaction only writes temporary datasets.
- return isTemporaryDatasetWriteJob;
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+ return AsterixBuiltinFunctions.lookupFunction(fid);
}
@Override
@@ -337,8 +388,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
}
- public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
- throws AsterixException {
+ public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) {
return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
}
@@ -367,64 +417,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return format;
}
- protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
- Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
- List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
- try {
- configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
- IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
- configuration, itemType, metaType);
-
- // check to see if dataset is indexed
- Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(),
- dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
- if (filesIndex != null && filesIndex.getPendingOp() == 0) {
- // get files
- List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
- Iterator<ExternalFile> iterator = files.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
- iterator.remove();
- }
- }
- // TODO Check this call, result of merge from master!
- // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
- }
-
- return adapterFactory;
- } catch (Exception e) {
- throw new AlgebricksException("Unable to create adapter", e);
- }
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
-
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
- adapterFactory);
-
- AlgebricksPartitionConstraint constraint;
- try {
- constraint = adapterFactory.getPartitionConstraint();
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
- }
-
public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
- Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
+ Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
libraryManager);
ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
@@ -445,8 +440,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
- return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
- partitionConstraint, adapterFactory);
+ return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
@@ -454,7 +448,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
@@ -496,10 +489,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
- secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
- dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
- secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+ secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+ DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+ dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
+ metaType);
comparatorFactories = comparatorFactoriesAndTypeTraits.first;
typeTraits = comparatorFactoriesAndTypeTraits.second;
if (filterTypeTraits != null) {
@@ -519,22 +512,18 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
// get meta item type
ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
- comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
- metaItemType, context.getBinaryComparatorFactoryProvider());
+ comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
+ context.getBinaryComparatorFactoryProvider());
filterFields = DatasetUtils.createFilterFields(dataset);
btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
- dataset.getDatasetName(), indexName, temp);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
+ indexName, temp);
- ISearchOperationCallbackFactory searchCallbackFactory = null;
+ ISearchOperationCallbackFactory searchCallbackFactory;
if (isSecondary) {
searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
: new SecondaryIndexSearchOperationCallbackFactory();
@@ -580,83 +569,29 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
int[] buddyBreeFields = new int[] { numSecondaryKeys };
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
new ExternalBTreeWithBuddyDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+ getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
}
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-
+ return new Pair<>(btreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
- List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
- List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
- throws AlgebricksException {
-
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- int sidxKeyFieldCount = sidxKeyFieldNames.size();
- int pidxKeyFieldCount = pidxKeyFieldNames.size();
- typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
- comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
-
- int i = 0;
- for (; i < sidxKeyFieldCount; ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
- sidxKeyFieldNames.get(i),
- (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
-
- for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
- IAType keyType = null;
- try {
- switch (dsType) {
- case INTERNAL:
- keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
- ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
- : recType.getSubFieldType(pidxKeyFieldNames.get(j));
- break;
- case EXTERNAL:
- keyType = IndexingConstants.getFieldType(j);
- break;
- default:
- throw new AlgebricksException("Unknown Dataset Type");
- }
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
-
- return new Pair<IBinaryComparatorFactory[], ITypeTraits[]>(comparatorFactories, typeTraits);
- }
-
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
try {
- ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
- dataset.getItemTypeName());
+ ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
boolean temp = dataset.getDatasetDetails().isTemp();
@@ -670,9 +605,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
- throw new AlgebricksException("Cannot use " + numSecondaryKeys
- + " fields as a key for the R-tree index. "
- + "There can be only one field as a key for the R-tree index.");
+ throw new AlgebricksException(
+ "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
+ + "There can be only one field as a key for the R-tree index.");
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
secondaryKeyFields.get(0), recType);
@@ -701,8 +636,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
- splitProviderAndPartitionConstraintsForDataset(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName, temp);
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -770,28 +705,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
}
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
-
+ return new Pair<>(rtreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
- IBinaryComparatorFactory[] primaryComparatorFactories) {
- IBinaryComparatorFactory[] btreeCompFactories = null;
- int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
- btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
- int i = 0;
- for (; i < comparatorFactories.length; i++) {
- btreeCompFactories[i] = comparatorFactories[i];
- }
- for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
- btreeCompFactories[i] = primaryComparatorFactories[j];
- }
- return btreeCompFactories;
- }
-
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
@@ -804,7 +723,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
getWriterFactory(), inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
- return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+ return new Pair<>(runtime, apc);
}
@Override
@@ -814,7 +733,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
-
ResultWriterOperatorDescriptor resultWriter = null;
try {
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
@@ -824,70 +742,24 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
} catch (IOException e) {
throw new AlgebricksException(e);
}
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
+ return new Pair<>(resultWriter, null);
}
@Override
- public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
- throws AlgebricksException {
- AqlDataSource ads = findDataSource(dataSourceId);
- Dataset dataset = ((DatasetDataSource) ads).getDataset();
-
- try {
- String indexName = indexId;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- if (secondaryIndex != null) {
- return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- if (primaryIndex.getIndexName().equals(indexId)) {
- return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- return null;
- }
- }
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
-
- public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
- Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
- if (dataset == null) {
- throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
- }
- IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
- IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
- INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
- byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
- ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
- return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
- dataset.getDatasetDetails(), domain);
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- String dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasourceName();
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
- }
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ String dataverseName = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numKeys = keys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
int[] bloomFilterKeyFields = new int[numKeys];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
@@ -923,8 +795,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
itemType, metaType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
+ datasetName, indexName, temp);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -953,114 +825,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, !temp));
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
- splitsAndConstraint.second);
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
- List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
-
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
- }
- boolean temp = dataset.getDatasetDetails().isTemp();
- isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
- int numKeys = keys.size();
- int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
- // Move key fields to front.
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
- + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
- int[] bloomFilterKeyFields = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
- fieldPermutation[i++] = propagatedSchema.findVariable(payload);
- if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[i++] = idx;
- }
- if (additionalNonFilteringFields != null) {
- for (LogicalVariable variable : additionalNonFilteringFields) {
- int idx = propagatedSchema.findVariable(variable);
- fieldPermutation[i++] = idx;
- }
- }
-
- try {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- String indexName = primaryIndex.getIndexName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
- ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
- itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
-
- // prepare callback
- JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numKeys];
- for (i = 0; i < numKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- itemType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = DatasetUtils.createFilterFields(dataset);
- int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
- TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-
+ return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
@@ -1072,7 +837,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, boolean bulkload) throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+ return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
}
@@ -1081,58 +846,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
- return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+ return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, false, null);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
- IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- Index secondaryIndex;
- try {
- secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- } catch (MetadataException e) {
- throw new AlgebricksException(e);
- }
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
- switch (secondaryIndex.getIndexType()) {
- case BTREE: {
- return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
- bulkload);
- }
- case RTREE: {
- return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
- bulkload);
- }
- case SINGLE_PARTITION_WORD_INVIX:
- case SINGLE_PARTITION_NGRAM_INVIX:
- case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- return getInvertedIndexDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
- indexOp, secondaryIndex.getIndexType(), bulkload);
- }
- default: {
- throw new AlgebricksException(
- "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
- }
- }
- }
-
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -1140,9 +857,34 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
boolean bulkload) throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
- bulkload);
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+ context, spec, bulkload, null, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+ context, spec, false, null, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+ ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
+ inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
+ context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
}
@Override
@@ -1156,17 +898,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
- IOperatorSchema inputSchema = new OperatorSchemaImpl();
+ IOperatorSchema inputSchema;
if (inputSchemas.length > 0) {
inputSchema = inputSchemas[0];
} else {
throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
}
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
+ Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
Index secondaryIndex;
try {
secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1174,1171 +913,890 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
// TokenizeOperator only supports a keyword or n-gram index.
switch (secondaryIndex.getIndexType()) {
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
+ case LENGTH_PARTITIONED_NGRAM_INVIX:
return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
- typeEnv, primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOperation.INSERT, secondaryIndex.getIndexType(), bulkload);
- }
- default: {
+ primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
+ default:
throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
+ secondaryIndex.getIndexType());
- }
}
-
}
- // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
- throws AlgebricksException {
-
- // Sanity checks.
- if (primaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot tokenize composite primary key.");
+ /**
+ * Calculate an estimate size of the bloom filter. Note that this is an
+ * estimation which assumes that the data is going to be uniformly
+ * distributed across all partitions.
+ *
+ * @param dataset
+ * @return Number of elements that will be used to create a bloom filter per
+ * dataset per partition
+ * @throws MetadataException
+ * @throws AlgebricksException
+ */
+ public long getCardinalityPerPartitionHint(Dataset dataset) throws MetadataException, AlgebricksException {
+ String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME);
+ long numElementsHint;
+ if (numElementsHintString == null) {
+ numElementsHint = DatasetCardinalityHint.DEFAULT;
+ } else {
+ numElementsHint = Long.parseLong(numElementsHintString);
}
- if (secondaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
+ int numPartitions = 0;
+ List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+ .getNodeNames();
+ for (String nd : nodeGroup) {
+ numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
}
+ numElementsHint = numElementsHint / numPartitions;
+ return numElementsHint;
+ }
- boolean isPartitioned;
- if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- isPartitioned = true;
- } else {
- isPartitioned = false;
- }
+ protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+ Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
+ List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
+ try {
+ configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
+ IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
+ configuration, itemType, metaType);
- // Number of Keys that needs to be propagated
- int numKeys = inputSchema.getSize();
+ // check to see if dataset is indexed
+ Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(),
+ dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
- // Get the rest of Logical Variables that are not (PK or SK) and each
- // variable's positions.
- // These variables will be propagated through TokenizeOperator.
- List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
- if (inputSchema.getSize() > 0) {
- for (int k = 0; k < inputSchema.getSize(); k++) {
- boolean found = false;
- for (LogicalVariable varKey : primaryKeys) {
- if (varKey.equals(inputSchema.getVariable(k))) {
- found = true;
- break;
- } else {
- found = false;
+ if (filesIndex != null && filesIndex.getPendingOp() == 0) {
+ // get files
+ List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ Iterator<ExternalFile> iterator = files.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+ iterator.remove();
}
}
- if (!found) {
- for (LogicalVariable varKey : secondaryKeys) {
- if (varKey.equals(inputSchema.getVariable(k))) {
- found = true;
- break;
- } else {
- found = false;
- }
- }
- }
- if (!found) {
- otherKeys.add(inputSchema.getVariable(k));
- }
}
- }
-
- // For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys + secondary
- // keys + other variables
- // secondary keys and other variables will be just passed to the
- // IndexInsertDelete Operator.
- int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
-
- // generate field permutations for the input
- int[] fieldPermutation = new int[numKeys];
-
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
- int i = 0;
- int j = 0;
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
- for (LogicalVariable varKey : otherKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+ return adapterFactory;
+ } catch (Exception e) {
+ throw new AlgebricksException("Unable to create adapter", e);
}
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
- .getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be tokenized.");
- }
-
- ARecordType recType = (ARecordType) itemType;
-
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
-
- List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
+ }
- int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
- ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
- ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+ public JobId getJobId() {
+ return jobId;
+ }
- // Find the key type of the secondary key. If it's a derived type,
- // return the derived type.
- // e.g. UNORDERED LIST -> return UNORDERED LIST type
- IAType secondaryKeyType = null;
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
- secondaryKeyExprs.get(0), recType);
- secondaryKeyType = keyPairType.first;
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- i = 0;
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
+ throws AlgebricksException {
+ return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
+ numKeyFields / 2);
+ }
- tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
- if (isPartitioned) {
- // The partitioning field is hardcoded to be a short *without*
- // an Asterix type tag.
- tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
- }
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
+ String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
- IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
- secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+ String dataverse) {
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
+ public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
+ String targetIdxName, boolean temp) throws AlgebricksException {
+ return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+ }
- // Generate Output Record format
- ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
- ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
- ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+ public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
+ throws MetadataException {
+ DatasourceAdapter adapter;
+ // search in default namespace (built-in adapter)
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
- // The order of the output record: propagated variables (including
- // PK and SK), token, and number of token.
- // #1. propagate all input variables
- for (int k = 0; k < recordDesc.getFieldCount(); k++) {
- tokenKeyPairFields[k] = recordDesc.getFields()[k];
- tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
- }
- int tokenOffset = recordDesc.getFieldCount();
+ // search in dataverse (user-defined adapter)
+ if (adapter == null) {
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+ }
+ return adapter;
+ }
- // #2. Specify the token type
- tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
- tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
- tokenOffset++;
+ public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ return AsterixClusterProperties.INSTANCE.getClusterLocations();
+ }
- // #3. Specify the length-partitioning key: number of token
- if (isPartitioned) {
- tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
- tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
- }
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+ String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+ return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+ datasetName, targetIdxName, create);
+ }
- RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
- IOperatorDescriptor tokenizerOp;
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
+ JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
+ JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainMissing)
+ throws AlgebricksException {
+ try {
+ // Get data type
+ IAType itemType;
+ itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+ dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
- // Keys to be tokenized : SK
- int docField = fieldPermutation[fieldPermutation.length - 1];
+ // Create the adapter factory <- right now there is only one. if there are more in the future, we can create
+ // a map->
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager,
+ datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing,
+ context.getMissingWriterFactory());
- // Keys to be propagated
- int[] keyFields = new int[numKeys];
- for (int k = 0; k < keyFields.length; k++) {
- keyFields[k] = k;
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+ try {
+ compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+ } catch (MetadataException e) {
+ throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
}
- tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
- keyFields, isPartitioned, true);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
- splitsAndConstraint.second);
+ boolean temp = datasetDetails.isTemp();
+ // Create the file index data flow helper
+ ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
+ // Create the out record descriptor, appContext and fileSplitProvider for the files index
+ RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+ dataset.getDatasetName(),
+ dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
+ ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new SecondaryIndexSearchOperationCallbackFactory();
+ // Create the operator
+ ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
+ outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
+ appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
+ retainMissing, context.getMissingWriterFactory());
+ return new Pair<>(op, spPc.second);
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
- false);
- }
-
- private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException {
- // No filtering condition.
- if (filterExpr == null) {
- return null;
- }
- IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
- IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
- typeEnv, inputSchemas, context);
- return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
- }
-
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- boolean bulkload) throws AlgebricksException {
-
- Dataset dataset = findDataset(dataverseName, datasetName);
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+ List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+ throw new AlgebricksException(
+ "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
}
boolean temp = dataset.getDatasetDetails().isTemp();
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
- int numKeys = primaryKeys.size() + secondaryKeys.size();
+ int numKeys = primaryKeys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
- // generate field permutations
- int[] fieldPermutation = new int[numKeys + numFilterFields];
- int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
- int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+ int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
+ // Move key fields to front. [keys, record, filters]
+ int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
+ int[] bloomFilterKeyFields = new int[numKeys];
int i = 0;
- int j = 0;
- for (LogicalVariable varKey : secondaryKeys) {
+ // set the keys' permutations
+ for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
bloomFilterKeyFields[i] = i;
i++;
}
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- modificationCallbackPrimaryKeyFields[j] = i;
- i++;
- j++;
- }
+ // set the record permutation
+ fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+ // set the filters' permutations.
if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys] = idx;
+ int idx = propagatedSchema.findVariable(filterKeys.get(0));
+ fieldPermutation[i++] = idx;
}
- String itemTypeName = dataset.getItemTypeName();
- IAType itemType;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
- .getDatatype();
-
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
-
- ARecordType recType = (ARecordType) itemType;
-
- // Index parameters.
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
-
- ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
- recType, context.getBinaryComparatorFactoryProvider());
- int[] filterFields = null;
- int[] btreeFields = null;
- if (filterTypeTraits != null) {
- filterFields = new int[1];
- filterFields[0] = numKeys;
- btreeFields = new int[numKeys];
- for (int k = 0; k < btreeFields.length; k++) {
- btreeFields[k] = k;
- }
+ if (additionalNonFilterFields != null) {
+ for (LogicalVariable var : additionalNonFilterFields) {
+ int idx = propagatedSchema.findVariable(var);
+ fieldPermutation[i++] = idx;
}
+ }
- List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
- List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), recType);
- IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = recType.getSubFieldType(partitioningKey);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ try {
+ Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName());
+ String indexName = primaryIndex.getIndexName();
+ String itemTypeName = dataset.getItemTypeName();
+ String itemTypeDataverseName = dataset.getItemTypeDataverseName();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype();
+ ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+ indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
+ int[] primaryKeyFields = new int[numKeys];
+ for (i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
- ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
- : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
- dataset.hasMetaPart());
+ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+ : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
+ IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+
+ LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp,
- new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp),
- filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixLSMTreeUpsertOperatorDescriptor op;
+
+ ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
+ outputSerDes[j] = recordDesc.getFields()[j];
}
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- }
+ outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+ .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+ .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexDmlRuntime(String dataverseName,
- String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- IndexType indexType, boolean bulkload) throws AlgebricksException {
+ if (dataset.hasMetaPart()) {
+ outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+ .getSerdeProvider().getSerializerDeserializer(metaItemType);
+ outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+ .getTypeTraitProvider().getTypeTrait(metaItemType);
+ }
- // Check the index is length-partitioned or not.
- boolean isPartitioned;
- if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- isPartitioned = true;
- } else {
- isPartitioned = false;
- }
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField = DatasetUtils.getFilterField(dataset).get(0);
+ for (i = 0; i < itemType.getFieldNames().length; i++) {
+ if (itemType.getFieldNames()[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+ .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ }
- // Sanity checks.
- if (primaryKeys.size() > 1) {
- throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
- }
- // The size of secondaryKeys can be two if it receives input from its
- // TokenizeOperator- [token, number of token]
- if (secondaryKeys.size() > 1 && !isPartitioned) {
- throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
- } else if (secondaryKeys.size() > 2 && isPartitioned) {
- throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
- }
+ RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+ op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ idfh, null, true, indexName, context.getMissingWriterFactory(), modificationCallbackFactory,
+ searchCallbackFactory, null);
+ op.setType(itemType);
+ op.setFilterIndex(fieldIdx);
+ return new Pair<>(op, splitsAndConstraint.second);
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new Alge
<TRUNCATED>