You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/03/08 20:53:17 UTC
[2/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@ public class ExternalDataConstants {
public static final String KEY_FILESYSTEM = "fs";
// specifies the address of the HDFS name node
public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
- // specifies whether a feed sends progress markers or not
- public static final String KEY_SEND_MARKER = "send-marker";
// specifies the class implementation of the accessed instance of HDFS
public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..a09ff9b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.util;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -162,7 +162,7 @@ public class ExternalDataUtils {
private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+ Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -339,8 +339,4 @@ public class ExternalDataUtils {
}
return intIndicators;
}
-
- public static boolean isSendMarker(Map<String, String> configuration) {
- return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..3863920 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -79,12 +79,10 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
ITupleForwarder forwarder;
ArrayTupleBuilder tb;
IPropertiesProvider propertiesProvider =
- (IPropertiesProvider) ((NodeControllerService) ctx
- .getJobletContext().getApplicationContext().getControllerService())
- .getApplicationContext()
- .getApplicationObject();
- ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
- .get(nodeId)[0];
+ (IPropertiesProvider) ((NodeControllerService) ctx.getJobletContext().getApplicationContext()
+ .getControllerService()).getApplicationContext().getApplicationObject();
+ ClusterPartition nodePartition =
+ propertiesProvider.getMetadataProperties().getNodePartitions().get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
forwarder = DataflowUtils.getTupleForwarder(configuration,
FeedUtils.getFeedLogManager(ctx,
@@ -144,5 +142,4 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
public ARecordType getMetaType() {
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..6901e1d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -310,8 +310,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
}
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
- String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
- : dataverse;
+ String dv =
+ dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse;
if (dv == null) {
return null;
}
@@ -353,25 +353,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
- try {
- String indexName = indexId;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- if (secondaryIndex != null) {
- return new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- if (primaryIndex.getIndexName().equals(indexId)) {
- return new DataSourceIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(),
- this);
- } else {
- return null;
- }
- }
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ String indexName = indexId;
+ Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+ return (secondaryIndex != null)
+ ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
+ : null;
+ }
+
+ public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
+ return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
}
public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
@@ -405,8 +395,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
- ExternalScanOperatorDescriptor dataScanner =
- new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
+ ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
constraint = adapterFactory.getPartitionConstraint();
@@ -462,8 +451,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
- boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes,
- int[] maxFilterFieldIndexes) throws AlgebricksException {
+ boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes)
+ throws AlgebricksException {
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
@@ -530,8 +519,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), indexName,
- temp);
+ spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
ISearchOperationCallbackFactory searchCallbackFactory;
if (isSecondary) {
@@ -586,8 +574,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
try {
- ARecordType recType =
- (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+ ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
boolean temp = dataset.getDatasetDetails().isTemp();
@@ -630,8 +617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType =
@@ -751,7 +738,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
- String indexName = primaryIndex.getIndexName();
ARecordType metaType = dataset.hasMetaPart()
? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
: null;
@@ -763,8 +749,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
itemType, metaType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -779,9 +764,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
- numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
- metaType, compactionInfo.first, compactionInfo.second),
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
+ false, numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex,
+ itemType, metaType, compactionInfo.first, compactionInfo.second),
metadataPageManagerFactory);
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -951,12 +936,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
numKeyFields / 2);
}
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(String dataverseName,
- String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
- FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
- return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
- }
-
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) {
return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
}
@@ -970,8 +949,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
throws MetadataException {
DatasourceAdapter adapter;
// search in default namespace (built-in adapter)
- adapter =
- MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// search in dataverse (user-defined adapter)
if (adapter == null) {
@@ -985,8 +963,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
- String dataverseName, String datasetName, String targetIdxName, boolean create)
- throws AlgebricksException {
+ String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
targetIdxName, create);
}
@@ -1104,8 +1081,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
@@ -1171,7 +1147,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName,
context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null,
- metadataPageManagerFactory);
+ metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
op.setType(itemType);
op.setFilterIndex(fieldIdx);
return new Pair<>(op, splitsAndConstraint.second);
@@ -1222,8 +1198,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
(hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
IAType keyType = keyPairType.first;
- comparatorFactories[i] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
@@ -1245,8 +1220,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
- comparatorFactories[i] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
@@ -1295,8 +1269,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName())
- .getDatatype();
+ .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
@@ -1304,8 +1277,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
// prepare callback
int datasetId = dataset.getDatasetId();
@@ -1462,10 +1434,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
itemType = (ARecordType) MetadataManager.INSTANCE
.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
validateRecordType(itemType);
- ARecordType metaType = dataset.hasMetaPart()
- ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
- : null;
+ ARecordType metaType =
+ dataset.hasMetaPart()
+ ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+ dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
+ : null;
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1488,8 +1461,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), itemType);
+ Pair<IAType, Boolean> keyPairType =
+ Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), itemType);
IAType keyType = keyPairType.first;
comparatorFactories[i] =
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -1506,18 +1479,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE)
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE, dataset.hasMetaPart());
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1536,7 +1508,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
+ prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
@@ -1648,7 +1620,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int k = 0; k < btreeFields.length; k++) {
btreeFields[k] = k + numSecondaryKeys;
@@ -1671,11 +1643,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
int datasetId = dataset.getDatasetId();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE)
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE, dataset.hasMetaPart());
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1694,13 +1665,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false,
indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
+ prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory,
- filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE, metadataPageManagerFactory);
+ comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory,
+ false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1874,7 +1845,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2055,8 +2026,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
- dataset.getDatasetDetails().isTemp());
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// Generate Output Record format
ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -2125,4 +2095,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
public IStorageComponentProvider getStorageComponentProvider() {
return storaegComponentProvider;
}
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
+ throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(),
+ ds.getDatasetDetails().isTemp());
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
+ String indexName) throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName,
+ ds.getDatasetDetails().isTemp());
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..34faf63 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,14 +29,15 @@ import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.JobUtils;
@@ -84,6 +85,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -409,8 +411,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
case LENGTH_PARTITIONED_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case SINGLE_PARTITION_WORD_INVIX:
- return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this,
- index, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
+ return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index,
+ recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
filterCmpFactories);
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
@@ -577,4 +579,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
datasetPartitions, isSink);
}
+
+ /**
+ * Get the index dataflow helper factory for the dataset's primary index
+ *
+ * @param mdProvider
+ * an instance of metadata provider that is used to fetch metadata information
+ * @throws AlgebricksException
+ */
+ public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+ throws AlgebricksException {
+ if (getDatasetType() != DatasetType.INTERNAL) {
+ throw new AlgebricksException(ErrorCode.ASTERIX,
+ ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType());
+ }
+ Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName());
+ ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+ ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+ DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext());
+ return getIndexDataflowHelperFactory(mdProvider, index, recordType, metaType, compactionInfo.first,
+ compactionInfo.second);
+ }
+
+ public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+ return NoOpFrameOperationCallbackFactory.INSTANCE;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 80792b5..572cc75 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -330,14 +330,12 @@ public class DatasetUtil {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return RuntimeUtils.createJobSpecification();
}
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
@@ -386,15 +384,12 @@ public class DatasetUtil {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return RuntimeUtils.createJobSpecification();
}
-
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
@@ -429,7 +424,6 @@ public class DatasetUtil {
}
Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, datasetName);
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
// get meta item type
@@ -451,7 +445,7 @@ public class DatasetUtil {
int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
@@ -495,7 +489,6 @@ public class DatasetUtil {
if (dataset == null) {
throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
}
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset);
@@ -505,7 +498,7 @@ public class DatasetUtil {
ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 249f035..edaa73e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -452,12 +452,11 @@ public class ExternalIndexingOperations {
JobSpecification spec = RuntimeUtils.createJobSpecification();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
- boolean temp = ds.getDatasetDetails().isTemp();
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -472,8 +471,7 @@ public class ExternalIndexingOperations {
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -499,11 +497,9 @@ public class ExternalIndexingOperations {
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
- boolean temp = ds.getDatasetDetails().isTemp();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -518,8 +514,7 @@ public class ExternalIndexingOperations {
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -546,11 +541,9 @@ public class ExternalIndexingOperations {
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
- boolean temp = ds.getDatasetDetails().isTemp();
-
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -565,8 +558,7 @@ public class ExternalIndexingOperations {
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index c6e0a6b..701d0d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -101,10 +101,8 @@ public class IndexUtil {
throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification();
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
- boolean temp = dataset.getDatasetDetails().isTemp();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
ARecordType recordType =
@@ -153,11 +151,9 @@ public class IndexUtil {
public static JobSpecification buildDropSecondaryIndexJobSpec(Index index, MetadataProvider metadataProvider,
Dataset dataset) throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification();
- boolean temp = dataset.getDatasetDetails().isTemp();
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
ARecordType recordType =
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index f7e569c..d731603 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -190,8 +190,7 @@ public abstract class SecondaryIndexOperationsHelper {
metaSerde =
metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
@@ -203,8 +202,7 @@ public abstract class SecondaryIndexOperationsHelper {
numFilterFields = 0;
}
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset);
primaryFileSplitProvider = primarySplitsAndConstraint.first;
primaryPartitionConstraint = primarySplitsAndConstraint.second;
setPrimaryRecDescAndComparators();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
public class ARecordPointable extends AbstractPointable {
private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ public static final ARecordPointableFactory FACTORY = new ARecordPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@ public class ARecordPointable extends AbstractPointable {
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class ARecordPointableFactory implements IPointableFactory {
+
private static final long serialVersionUID = 1L;
+ private ARecordPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public ARecordPointable createPointable() {
return new ARecordPointable();
}
@@ -98,7 +103,8 @@ public class ARecordPointable extends AbstractPointable {
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+
+ }
public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index dd7335a..042837b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -22,7 +22,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -65,7 +64,7 @@ public class ReportMaxResourceIdMessage implements IApplicationMessage {
((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 2fedcca..8739948 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,7 +20,6 @@ package org.apache.asterix.runtime.message;
import java.util.Set;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -57,7 +56,7 @@ public class ResourceIdRequestMessage implements IApplicationMessage {
}
broker.sendApplicationMessageToNC(reponse, src);
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..037945a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -56,6 +56,9 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -83,13 +86,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
private final int filterFieldIndex;
private final int metaFieldIndex;
private LockThenSearchOperationCallback searchCallback;
+ private IFrameOperationCallback frameOpCallback;
+ private final IFrameOperationCallbackFactory frameOpCallbackFactory;
- public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
- ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+ public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+ ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory)
+ throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
this.key = new PermutingFrameTupleReference();
this.numOfPrimaryKeys = numOfPrimaryKeys;
+ this.frameOpCallbackFactory = frameOpCallbackFactory;
missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
int[] searchKeyPermutations = new int[numOfPrimaryKeys];
for (int i = 0; i < searchKeyPermutations.length; i++) {
@@ -104,7 +111,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
isFiltered = true;
this.recordType = recordType;
this.presetFieldIndex = filterFieldIndex;
- this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+ this.recPointable = ARecordPointable.FACTORY.createPointable();
this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
@@ -140,17 +147,19 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResource(), ctx, this);
+ modCallback = opDesc.getModificationOpCallbackFactory()
+ .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAppRuntimeContext runtimeCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
+ frameOpCallback =
+ frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
@@ -188,7 +197,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
tb.addFieldEndOffset();
}
- //TODO: use tryDelete/tryInsert in order to prevent deadlocks
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
@@ -221,8 +229,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
}
// if with filters, append the filter
if (isFiltered) {
- dos.write(prevTuple.getFieldData(filterFieldIndex),
- prevTuple.getFieldStart(filterFieldIndex),
+ dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
prevTuple.getFieldLength(filterFieldIndex));
tb.addFieldEndOffset();
}
@@ -258,6 +265,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
writeOutput(i, recordWasInserted, prevTuple != null);
i++;
}
+ // callback here before calling nextFrame on the next operator
+ frameOpCallback.frameCompleted(!firstModification);
appender.write(writer, true);
} catch (IndexException | IOException | AsterixException e) {
throw new HyracksDataException(e);
@@ -318,6 +327,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
@Override
public void flush() throws HyracksDataException {
- writer.flush();
+ // No op since nextFrame flushes by default
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index fe69a04..b37ecae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -37,12 +37,14 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.common.IStorageManager;
public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final int[] prevValuePermutation;
+ private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private ARecordType type;
private int filterIndex = -1;
@@ -54,12 +56,13 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
IModificationOperationCallbackFactory modificationOpCallbackProvider,
ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
- IPageManagerFactory pageManagerFactory) {
+ IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) {
super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
this.prevValuePermutation = prevValuePermutation;
+ this.frameOpCallbackFactory = frameOpCallbackFactory;
}
@Override
@@ -67,7 +70,7 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return isPrimary()
? new LSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
- recordDescProvider, comparatorFactories.length, type, filterIndex)
+ recordDescProvider, comparatorFactories.length, type, filterIndex, frameOpCallbackFactory)
: new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, prevValuePermutation);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 33078ff..90f6bbf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@ import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- private final static long SEED = 0L;
+ protected static final long SEED = 0L;
protected final ITransactionManager transactionManager;
protected final ILogManager logMgr;
@@ -85,8 +85,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
try {
transactionContext = transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
- ILogMarkerCallback callback =
- TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+ ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
logRecord = new LogRecord(callback);
if (isSink) {
return;
@@ -112,6 +111,8 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
* active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
* flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
*/
+ // TODO: Fix this for upserts. an upsert tuple right now expect to notify the opTracker twice (one for
+ // delete and one for insert)
transactionContext.notifyOptracker(false);
} else {
tRef.reset(tAccess, t);
@@ -126,7 +127,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
}
}
}
- VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+ IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
if (message != null
&& MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
try {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
- private final int datasetId;
- private final int[] primaryKeyFields;
- private final boolean isTemporaryDatasetWriteJob;
- private final boolean isWriteTransaction;
- private final int upsertVarIdx;
- private int[] datasetPartitions;
- private final boolean isSink;
+ protected final JobId jobId;
+ protected final int datasetId;
+ protected final int[] primaryKeyFields;
+ protected final boolean isTemporaryDatasetWriteJob;
+ protected final boolean isWriteTransaction;
+ protected final int upsertVarIdx;
+ protected int[] datasetPartitions;
+ protected final boolean isSink;
public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@ public class VSizeFrame implements IFrame {
buffer = ctx.allocateFrame(frameSize);
}
+ @Override
public ByteBuffer getBuffer() {
return buffer;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 6c581f0..4c0eb1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,8 +28,10 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
*/
public class HyracksDataException extends HyracksException {
+ private static final long serialVersionUID = 1L;
+
public static HyracksDataException create(Throwable cause) {
- if (cause instanceof HyracksDataException) {
+ if (cause instanceof HyracksDataException || cause == null) {
return (HyracksDataException) cause;
}
return new HyracksDataException(cause);
@@ -48,6 +50,14 @@ public class HyracksDataException extends HyracksException {
.getParams());
}
+ public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
+ if (root == null) {
+ return HyracksDataException.create(th);
+ }
+ root.addSuppressed(th);
+ return root;
+ }
+
public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
super(component, errorCode, message, cause, nodeId, params);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 21b9dcf..77404b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -463,4 +463,8 @@ public class ClusterControllerService implements IControllerService {
return CCApplicationEntryPoint.INSTANCE;
}
}
+
+ public ICCApplicationEntryPoint getApplication() {
+ return aep;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..77f18ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,19 @@ public abstract class AbstractPointable implements IPointable {
protected int length;
+ /**
+ * copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = length of this pointable
+ *
+ * @param copy
+ * the array to write into
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size is smaller than length
+ */
+ public void copyInto(byte[] copy) {
+ System.arraycopy(bytes, start, copy, 0, length);
+ }
+
@Override
public void set(byte[] bytes, int start, int length) {
this.bytes = bytes;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
*/
package org.apache.hyracks.data.std.api;
+/**
+ * Point to range over byte array
+ */
public interface IPointable extends IValueReference {
- public void set(byte[] bytes, int start, int length);
+ /**
+ * Point to the range from position = start with length = length over the byte array bytes
+ *
+ * @param bytes
+ * the byte array
+ * @param start
+ * the start offset
+ * @param length
+ * the length of the range
+ */
+ void set(byte[] bytes, int start, int length);
- public void set(IValueReference pointer);
+ /**
+ * Point to the same range pointed to by the passed pointer
+ *
+ * @param pointer
+ * the pointer to the targetted range
+ */
+ void set(IValueReference pointer);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@ package org.apache.hyracks.data.std.primitive;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IPointableFactory;
public final class VoidPointable extends AbstractPointable {
+ public static final VoidPointableFactory FACTORY = new VoidPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -38,11 +38,14 @@ public final class VoidPointable extends AbstractPointable {
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class VoidPointableFactory implements IPointableFactory {
private static final long serialVersionUID = 1L;
+ private VoidPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public VoidPointable createPointable() {
return new VoidPointable();
}
@@ -50,5 +53,5 @@ public final class VoidPointable extends AbstractPointable {
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..efdd963 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -108,4 +108,11 @@ public class AbstractFrameAppender implements IFrameAppender {
return false;
}
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ if (tupleCount > 0) {
+ write(writer, true);
+ }
+ writer.flush();
+ }
}