You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/09/27 16:25:55 UTC
[asterixdb] branch master updated: [ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId
This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2515b67aef [ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId
2515b67aef is described below
commit 2515b67aef03afa96c06c4f3d758545ee1bc52d8
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Tue Sep 26 19:01:10 2023 -0700
[ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Include 'database' in DataSourceId, DataSourceIndex, and EntityId.
- Change 'Feed' tuple translator to handle 'database' value and
pass it to EntityId.
Change-Id: Icf67d0950810fe0706e16e5bf27f2f9ceda703c6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17813
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <al...@gmail.com>
Reviewed-by: Ali Alsuliman <al...@gmail.com>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
.../java/org/apache/asterix/active/EntityId.java | 18 ++++++++++------
.../IntroduceSecondaryIndexInsertDeleteRule.java | 4 ++--
.../rules/SetAsterixPhysicalOperatorsRule.java | 4 ++--
.../optimizer/rules/am/AccessMethodUtils.java | 5 +++--
.../pushdown/visitor/PushdownOperatorVisitor.java | 4 +++-
.../translator/LangExpressionToPlanTranslator.java | 2 +-
.../asterix/app/function/DatasetRewriter.java | 3 ++-
.../apache/asterix/app/function/FeedRewriter.java | 4 ++--
.../asterix/app/function/QueryIndexDatasource.java | 2 +-
.../asterix/app/translator/QueryTranslator.java | 9 ++++----
.../test/active/ActiveEventsListenerTest.java | 4 ++--
.../asterix/test/active/ActiveStatsTest.java | 6 ++++--
.../apache/asterix/external/feed/api/IFeed.java | 2 ++
.../external/feed/management/FeedConnectionId.java | 4 ++--
.../operators/FeedIntakeOperatorDescriptor.java | 6 ++++--
.../external/feed/test/InputHandlerTest.java | 4 +++-
.../asterix/metadata/declared/DataSourceId.java | 25 +++++++++++++++-------
.../asterix/metadata/declared/DataSourceIndex.java | 6 ++++--
.../asterix/metadata/declared/FeedDataSource.java | 4 ++--
.../metadata/declared/FunctionDataSource.java | 4 +++-
.../metadata/declared/LoadableDataSource.java | 6 ++++--
.../metadata/declared/MetadataProvider.java | 2 +-
.../metadata/declared/SampleDataSource.java | 3 ++-
.../org/apache/asterix/metadata/entities/Feed.java | 10 +++++++--
.../asterix/metadata/entities/FeedConnection.java | 2 +-
.../FeedTupleTranslator.java | 22 ++++++++++++++++++-
26 files changed, 113 insertions(+), 52 deletions(-)
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
index 1d33961768..fa82d3744b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
@@ -28,18 +28,24 @@ import org.apache.asterix.common.metadata.DataverseName;
*/
public class EntityId implements Serializable {
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
private final String extensionName;
+ private final String databaseName;
private final DataverseName dataverseName;
private final String entityName;
- public EntityId(String extentionName, DataverseName dataverseName, String entityName) {
- this.extensionName = extentionName;
+ public EntityId(String extensionName, String databaseName, DataverseName dataverseName, String entityName) {
+ this.extensionName = extensionName;
+ this.databaseName = databaseName;
this.dataverseName = dataverseName;
this.entityName = entityName;
}
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
public DataverseName getDataverseName() {
return dataverseName;
}
@@ -57,13 +63,13 @@ public class EntityId implements Serializable {
return true;
}
EntityId other = (EntityId) o;
- return Objects.equals(other.dataverseName, dataverseName) && Objects.equals(other.entityName, entityName)
- && Objects.equals(other.extensionName, extensionName);
+ return Objects.equals(other.databaseName, databaseName) && Objects.equals(other.dataverseName, dataverseName)
+ && Objects.equals(other.entityName, entityName) && Objects.equals(other.extensionName, extensionName);
}
@Override
public int hashCode() {
- return Objects.hash(dataverseName, entityName, extensionName);
+ return Objects.hash(databaseName, dataverseName, entityName, extensionName);
}
@Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 25c97ccd4e..58f85eed01 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -366,7 +366,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
context.getOutputTypeEnvironment(currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
}
- DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp);
+ DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
// Introduce the TokenizeOperator only when doing bulk-load,
// and index type is keyword or n-gram.
@@ -623,7 +623,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
beforeOpFilterExpression = createAnyUnknownFilterExpression(originalKeyVarList,
context.getOutputTypeEnvironment(originalAssignCoordinates), forceFilter);
}
- DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp);
+ DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 90c5fadeb5..0949c44cb7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -260,9 +260,9 @@ public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysical
AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
jobGenParams.readFromFuncArgs(f.getArguments());
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
- DataSourceId dataSourceId =
- new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
String database = MetadataUtil.resolveDatabase(null, jobGenParams.getDataverseName());
+ DataSourceId dataSourceId =
+ new DataSourceId(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
Dataset dataset = mp.findDataset(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
IDataSourceIndex<String, DataSourceId> dsi =
mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index defb383377..67c8423bef 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -1001,7 +1001,7 @@ public class AccessMethodUtils {
/**
* In case of a left outer join we look for a special GroupBy above the join operator
- * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext)}.
+ * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext, boolean)}.
* A "Special GroupBy" is a GroupBy that eliminates unjoined duplicates that might be produced by the secondary
* index probe. We probe secondary indexes on each index partition and return a tuple with a right branch variable
* set to MISSING (or NULL) if there's no match on that partition. Therefore if there's more than one partition
@@ -2049,7 +2049,8 @@ public class AccessMethodUtils {
unnestOp.setExecutionMode(ExecutionMode.PARTITIONED);
//set the physical operator
- DataSourceId dataSourceId = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName());
+ DataSourceId dataSourceId =
+ new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
unnestOp.setPhysicalOperator(new ExternalDataLookupPOperator(dataSourceId, dataset, recordType, primaryKeyVars,
false, retainInput, retainNull));
return unnestOp;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
index a751aaf50f..a6f9005577 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
@@ -231,7 +231,9 @@ public class PushdownOperatorVisitor implements ILogicalOperatorVisitor<Void, Vo
return null;
}
- DataSourceId dsid = new DataSourceId(DataverseName.createFromCanonicalForm(dataverse), dataset);
+ DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverse);
+ String database = MetadataUtil.resolveDatabase(null, dataverseName);
+ DataSourceId dsid = new DataSourceId(database, dataverseName, dataset);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
return metadataProvider.findDataSource(dsid);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 09d57926a0..88be2d8d1a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -734,7 +734,7 @@ abstract class LangExpressionToPlanTranslator
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"Cannot write output to an external " + dataset());
}
- DataSourceId sourceId = new DataSourceId(dataverseName, datasetName);
+ DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName);
String itemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getItemTypeDataverseName());
String metaItemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getMetaItemTypeDataverseName());
IAType itemType = metadataProvider.findType(itemTypeDatabase, dataset.getItemTypeDataverseName(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
index d6abf554b9..bbdde8851a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -99,7 +99,8 @@ public class DatasetRewriter implements IFunctionToDataSourceRewriter, IResultTy
}
variables.add(unnest.getVariable());
- DataSourceId dsid = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName());
+ DataSourceId dsid =
+ new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
DataSource dataSource = metadataProvider.findDataSource(dsid);
boolean hasMeta = dataSource.hasMeta();
if (hasMeta) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index 035765fbc8..66d8190bb2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -85,9 +85,9 @@ public class FeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeC
String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
- DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
- String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
String database = MetadataUtil.resolveDatabase(null, dataverseName);
+ DataSourceId asid = new DataSourceId(database, dataverseName, getTargetFeed);
+ String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(database, dataverseName, policyName);
if (policy == null) {
policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index 5178c3ef1d..b3889fd8ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -135,7 +135,7 @@ public class QueryIndexDatasource extends FunctionDataSource {
}
private static DataSourceId createQueryIndexDataSourceId(Dataset dataset, String indexName) {
- return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(),
+ return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(),
new String[] { indexName, QueryIndexRewriter.QUERY_INDEX.getName() });
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 574448d7b2..17988cf36f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3872,7 +3872,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
Map<String, String> configuration = cfs.getConfiguration();
ExternalDataUtils.normalize(configuration);
ExternalDataUtils.validate(configuration);
- feed = new Feed(dataverseName, feedName, configuration);
+ feed = new Feed(database, dataverseName, feedName, configuration);
FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx, warningCollector);
MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -4060,7 +4060,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
try {
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// Runtime handler
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName);
// Feed & Feed Connections
Feed feed = FeedMetadataUtil.validateIfFeedExists(database, dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
@@ -4106,8 +4106,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
StopFeedStatement sfst = (StopFeedStatement) stmt;
SourceLocation sourceLoc = sfst.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
+ String database = MetadataUtil.resolveDatabase(null, dataverseName);
String feedName = sfst.getFeedName().getValue();
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName);
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
// Obtain runtime info from ActiveListener
@@ -4208,7 +4209,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
// Check whether feed is alive
ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler
- .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName));
+ .getListener(new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName));
if (listener != null && listener.isActive()) {
throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, sourceLoc,
feedName);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 93da8c518d..baecf79d1f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -86,7 +86,7 @@ public class ActiveEventsListenerTest {
static DataverseName dataverseName = MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME;
static String database = MetadataUtil.databaseFor(dataverseName);
static String entityName = "entityName";
- static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName);
+ static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName);
static Dataset firstDataset;
static Dataset secondDataset;
static List<Dataset> allDatasets;
@@ -1523,7 +1523,7 @@ public class ActiveEventsListenerTest {
TestEventsListener[] additionalListeners = new TestEventsListener[3];
for (int i = 0; i < additionalListeners.length; i++) {
String entityName = "entityName" + i;
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName);
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName);
ClusterControllerService ccService = Mockito.mock(ClusterControllerService.class);
CCServiceContext ccServiceCtx = Mockito.mock(CCServiceContext.class);
CcApplicationContext ccAppCtx = Mockito.mock(CcApplicationContext.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index cb123bfb92..e17a7fe374 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -42,6 +42,7 @@ import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -74,8 +75,9 @@ public class ActiveStatsTest {
@Test
public void refreshStatsTest() throws Exception {
// Entities to be used
- EntityId entityId =
- new EntityId("MockExtension", DataverseName.createSinglePartName("MockDataverse"), "MockEntity");
+ DataverseName mockDataverse = DataverseName.createSinglePartName("MockDataverse");
+ String mockDatabase = MetadataUtil.resolveDatabase(null, mockDataverse);
+ EntityId entityId = new EntityId("MockExtension", mockDatabase, mockDataverse, "MockEntity");
ActiveRuntimeId activeRuntimeId =
new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0);
List<Dataset> datasetList = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
index dd1d1b73d6..de3fa5744d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
@@ -28,6 +28,8 @@ public interface IFeed extends Serializable {
public String getFeedName();
+ public String getDatabaseName();
+
public DataverseName getDataverseName();
public EntityId getFeedId();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 73edc6e1d6..a2f3b1087f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -42,8 +42,8 @@ public class FeedConnectionId implements Serializable {
this.hash = toString().hashCode();
}
- public FeedConnectionId(DataverseName dataverseName, String feedName, String datasetName) {
- this(new EntityId(FEED_EXTENSION_NAME, dataverseName, feedName), datasetName);
+ public FeedConnectionId(String databaseName, DataverseName dataverseName, String feedName, String datasetName) {
+ this(new EntityId(FEED_EXTENSION_NAME, databaseName, dataverseName, feedName), datasetName);
}
public EntityId getFeedId() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 0647763d04..dbc571496c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -85,7 +85,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory,
ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDatabaseName(), primaryFeed.getDataverseName(),
+ primaryFeed.getFeedName());
this.adaptorFactory = adapterFactory;
this.adapterOutputType = adapterOutputType;
this.policyAccessor = policyAccessor;
@@ -96,7 +97,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType,
FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new EntityId(FEED_EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
+ this.feedId =
+ new EntityId(FEED_EXTENSION_NAME, feed.getDatabaseName(), feed.getDataverseName(), feed.getFeedName());
this.adaptorFactoryClassName = adapterFactoryClassName;
this.adaptorLibraryDataverse = adapterLibraryDataverse;
this.adaptorLibraryName = adapterLibraryName;
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 30d32668cd..a72c1bf5ac 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -34,6 +34,7 @@ import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -72,8 +73,9 @@ public class InputHandlerTest {
private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException, AsterixException {
DataverseName dvName = DataverseName.createSinglePartName(DATAVERSE_NAME);
+ String dbName = MetadataUtil.databaseFor(dvName);
FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
- EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dvName, FEED);
+ EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dbName, dvName, FEED);
FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
index c27ac420fa..b60682c2af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
@@ -26,6 +26,8 @@ import org.apache.asterix.common.metadata.DataverseName;
public final class DataSourceId {
+ private final String databaseName;
+
private final DataverseName dataverseName;
private final String datasourceName;
@@ -35,13 +37,15 @@ public final class DataSourceId {
/**
* The original constructor taking
*
+ * @param databaseName
+ * the database name
* @param dataverseName
- * the dataverse (namespace) for this datasource
+ * the dataverse (namespace) for this datasource
* @param datasourceName
- * the name for this datasource
+ * the name for this datasource
*/
- public DataSourceId(DataverseName dataverseName, String datasourceName) {
- this(dataverseName, datasourceName, null);
+ public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName) {
+ this(databaseName, dataverseName, datasourceName, null);
}
/**
@@ -50,7 +54,8 @@ public final class DataSourceId {
* that would expose different behavior. It enables the definition of (compile-time) parameterized datasources.
* Please note that the first 2 parameters still need to be 1) a dataverse name and 2) a datasource name.
*/
- public DataSourceId(DataverseName dataverseName, String datasourceName, String[] parameters) {
+ public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName, String[] parameters) {
+ this.databaseName = databaseName;
this.dataverseName = dataverseName;
this.datasourceName = datasourceName;
this.parameters = parameters;
@@ -61,6 +66,10 @@ public final class DataSourceId {
return dataverseName + "." + datasourceName + (parameters != null ? "." + String.join(".", parameters) : "");
}
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
public DataverseName getDataverseName() {
return dataverseName;
}
@@ -78,13 +87,13 @@ public final class DataSourceId {
return false;
}
DataSourceId that = (DataSourceId) o;
- return dataverseName.equals(that.dataverseName) && datasourceName.equals(that.datasourceName)
- && Arrays.equals(parameters, that.parameters);
+ return Objects.equals(databaseName, that.databaseName) && dataverseName.equals(that.dataverseName)
+ && datasourceName.equals(that.datasourceName) && Arrays.equals(parameters, that.parameters);
}
@Override
public int hashCode() {
- int result = Objects.hash(dataverseName, datasourceName);
+ int result = Objects.hash(databaseName, dataverseName, datasourceName);
result = 31 * result + Arrays.hashCode(parameters);
return result;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
index 05498b9e5e..80ca03690d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java
@@ -27,15 +27,17 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
public class DataSourceIndex implements IDataSourceIndex<String, DataSourceId> {
private final Index index;
+ private final String database;
private final DataverseName dataverseName;
private final String datasetName;
private final MetadataProvider metadataProvider;
// Every transactions needs to work with its own instance of an
// MetadataProvider.
- public DataSourceIndex(Index index, DataverseName dataverseName, String datasetName,
+ public DataSourceIndex(Index index, String database, DataverseName dataverseName, String datasetName,
MetadataProvider metadataProvider) {
this.index = index;
+ this.database = database;
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.metadataProvider = metadataProvider;
@@ -45,7 +47,7 @@ public class DataSourceIndex implements IDataSourceIndex<String, DataSourceId> {
@Override
public IDataSource<DataSourceId> getDataSource() {
try {
- DataSourceId sourceId = new DataSourceId(dataverseName, datasetName);
+ DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName);
return metadataProvider.lookupSourceInMetadata(sourceId);
} catch (Exception me) {
return null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index d975404dd4..628c745666 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -191,8 +191,8 @@ public class FeedDataSource extends DataSource implements IMutationDataSource {
throw new AlgebricksException("Feed not configured with a policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- FeedConnectionId feedConnectionId =
- new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset());
+ FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDatabaseName(),
+ getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset());
FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 690338c13c..683bf0f5d9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
@@ -135,6 +136,7 @@ public abstract class FunctionDataSource extends DataSource {
}
protected static DataSourceId createDataSourceId(FunctionIdentifier fid, String... parameters) {
- return new DataSourceId(FunctionSignature.getDataverseName(fid), fid.getName(), parameters);
+ return new DataSourceId(MetadataUtil.resolveDatabase(null, FunctionSignature.getDataverseName(fid)),
+ FunctionSignature.getDataverseName(fid), fid.getName(), parameters);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 5fa2af4cc8..20505bb2f1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.metadata.entities.Dataset;
@@ -64,8 +65,9 @@ public class LoadableDataSource extends DataSource {
public LoadableDataSource(Dataset targetDataset, IAType itemType, IAType metaItemType, String adapter,
Map<String, String> properties) throws AlgebricksException, IOException {
- super(new DataSourceId(DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType,
- Type.LOADABLE, null);
+ super(new DataSourceId(MetadataUtil.databaseFor(DataverseName.createSinglePartName(LOADABLE_DV)),
+ DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType, Type.LOADABLE,
+ null);
this.targetDataset = targetDataset;
this.adapter = adapter;
this.adapterProperties = properties;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a900366699..3a9f6c1065 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -434,7 +434,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
String database = dataset.getDatabaseName();
String datasetName = dataset.getDatasetName();
Index index = getIndex(database, dataverseName, datasetName, indexId);
- return index != null ? new DataSourceIndex(index, dataverseName, datasetName, this) : null;
+ return index != null ? new DataSourceIndex(index, database, dataverseName, datasetName, this) : null;
}
public Index getIndex(String database, DataverseName dataverseName, String datasetName, String indexName)
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 3b2937b60b..ff6cffad6c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -76,6 +76,7 @@ public class SampleDataSource extends DataSource {
}
private static DataSourceId createSampleDataSourceId(Dataset dataset, String sampleIndexName) {
- return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), new String[] { sampleIndexName });
+ return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(),
+ new String[] { sampleIndexName });
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
index e78b7d46e9..5796df952c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java
@@ -41,8 +41,9 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
/** Feed configurations */
private final Map<String, String> feedConfiguration;
- public Feed(DataverseName dataverseName, String feedName, Map<String, String> feedConfiguration) {
- this.feedId = new EntityId(EXTENSION_NAME, dataverseName, feedName);
+ public Feed(String databaseName, DataverseName dataverseName, String feedName,
+ Map<String, String> feedConfiguration) {
+ this.feedId = new EntityId(EXTENSION_NAME, databaseName, dataverseName, feedName);
this.displayName = "(" + feedId + ")";
this.feedConfiguration = feedConfiguration;
}
@@ -52,6 +53,11 @@ public class Feed implements IMetadataEntity<Feed>, IFeed {
return feedId;
}
+ @Override
+ public String getDatabaseName() {
+ return feedId.getDatabaseName();
+ }
+
@Override
public DataverseName getDataverseName() {
return feedId.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 06ffce498f..a0a288f6c9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -58,7 +58,7 @@ public class FeedConnection implements IMetadataEntity<FeedConnection> {
this.policyName = policyName;
this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody;
this.outputType = outputType;
- this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName);
+ this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, databaseName, dataverseName, feedName);
}
public List<FunctionSignature> getAppliedFunctions() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index bb1c9ad51d..0af8298c32 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -28,6 +28,7 @@ import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.metadata.bootstrap.FeedEntity;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.Feed;
@@ -59,6 +60,13 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
String dataverseCanonicalName =
((AString) feedRecord.getValueByPos(feedEntity.dataverseNameIndex())).getStringValue();
DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName);
+ int databaseNameIndex = feedEntity.databaseNameIndex();
+ String databaseName;
+ if (databaseNameIndex >= 0) {
+ databaseName = ((AString) feedRecord.getValueByPos(databaseNameIndex)).getStringValue();
+ } else {
+ databaseName = MetadataUtil.databaseFor(dataverseName);
+ }
String feedName = ((AString) feedRecord.getValueByPos(feedEntity.feedNameIndex())).getStringValue();
AUnorderedList feedConfig = (AUnorderedList) feedRecord.getValueByPos(feedEntity.adapterConfigIndex());
@@ -74,7 +82,7 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
adaptorConfiguration.put(key, value);
}
- return new Feed(dataverseName, feedName, adaptorConfiguration);
+ return new Feed(databaseName, dataverseName, feedName, adaptorConfiguration);
}
@Override
@@ -83,6 +91,12 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
// write the key in the first two fields of the tuple
tupleBuilder.reset();
+
+ if (feedEntity.databaseNameIndex() >= 0) {
+ aString.setValue(feed.getDatabaseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ }
aString.setValue(dataverseCanonicalName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
@@ -93,6 +107,12 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
recordBuilder.reset(feedEntity.getRecordType());
+ if (feedEntity.databaseNameIndex() >= 0) {
+ fieldValue.reset();
+ aString.setValue(feed.getDatabaseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(feedEntity.databaseNameIndex(), fieldValue);
+ }
// write dataverse name
fieldValue.reset();
aString.setValue(dataverseCanonicalName);