You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:04 UTC
[07/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 56fbcb5..ec727a9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -21,7 +21,6 @@ import java.rmi.RemoteException;
import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.metadata.MetadataException;
@@ -32,8 +31,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -497,16 +494,6 @@ public interface IMetadataNode extends Remote, Serializable {
public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException,
RemoteException;
- public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId,
- FeedActivityType... feedActivityFilter) throws MetadataException, RemoteException;
-
- /**
- * @param jobId
- * @throws MetadataException
- * @throws RemoteException
- */
- public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException;
-
/**
* @param jobId
* @param dataverse
@@ -559,16 +546,7 @@ public interface IMetadataNode extends Remote, Serializable {
*/
public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException;
- /**
- * @param jobId
- * A globally unique id for an active metadata transaction.
- * @param feedId
- * A unique id for the feed
- * @param feedActivity
- */
- public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity)
- throws MetadataException, RemoteException;
-
+
/**
* @param jobId
* @param feedPolicy
@@ -588,17 +566,7 @@ public interface IMetadataNode extends Remote, Serializable {
public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
RemoteException;
- /**
- * @param jobId
- * @param dataverse
- * @param dataset
- * @return
- * @throws MetadataException
- * @throws RemoteException
- */
- public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
- RemoteException;
-
+
/**
* Removes a library , acquiring local locks on behalf of the given
* transaction id.
@@ -667,16 +635,28 @@ public interface IMetadataNode extends Remote, Serializable {
public List<Feed> getDataverseFeeds(JobId jobId, String dataverseName) throws MetadataException, RemoteException;
/**
+ * delete a give feed (ingestion) policy
+ *
* @param jobId
* @param dataverseName
- * @param deedName
+ * @param policyName
+ * @return
+ * @throws RemoteException
+ * @throws MetadataException
+ */
+ public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
+ RemoteException;
+
+ /**
+ * @param jobId
+ * @param dataverse
* @return
* @throws MetadataException
* @throws RemoteException
*/
- public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverseName, String deedName)
- throws MetadataException, RemoteException;
-
+ public List<FeedPolicy> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
+ RemoteException;
+
/**
* @param jobId
* A globally unique id for an active metadata transaction.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index c93c29e..0ac211e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -57,9 +57,9 @@ import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.FileStructur
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
@@ -123,7 +123,7 @@ public class MetadataBootstrap {
MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_DATASET,
- MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET,
+ MetadataPrimaryIndexes.FEED_POLICY_DATASET,
MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 7eae5cd..04e56b1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.metadata.bootstrap;
-import java.util.ArrayList;
import java.util.Arrays;
import edu.uci.ics.asterix.metadata.MetadataException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 30ae9fa..b726ed6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -50,19 +50,21 @@ public final class MetadataRecordTypes {
public static ARecordType FUNCTION_RECORDTYPE;
public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
public static ARecordType FEED_RECORDTYPE;
+ public static ARecordType PRIMARY_FEED_DETAILS_RECORDTYPE;
+ public static ARecordType SECONDARY_FEED_DETAILS_RECORDTYPE;
public static ARecordType FEED_ADAPTER_CONFIGURATION_RECORDTYPE;
public static ARecordType FEED_ACTIVITY_RECORDTYPE;
public static ARecordType FEED_POLICY_RECORDTYPE;
public static ARecordType POLICY_PARAMS_RECORDTYPE;
- public static ARecordType FEED_ACTIVITY_DETAILS_RECORDTYPE;
public static ARecordType LIBRARY_RECORDTYPE;
public static ARecordType COMPACTION_POLICY_RECORDTYPE;
public static ARecordType EXTERNAL_FILE_RECORDTYPE;
/**
* Create all metadata record types.
+ * @throws HyracksDataException
*/
- public static void init() throws MetadataException {
+ public static void init() throws MetadataException, HyracksDataException {
// Attention: The order of these calls is important because some types
// depend on other types being created first.
// These calls are one "dependency chain".
@@ -90,10 +92,10 @@ public final class MetadataRecordTypes {
FUNCTION_RECORDTYPE = createFunctionRecordType();
DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
- FEED_RECORDTYPE = createFeedRecordType();
FEED_ADAPTER_CONFIGURATION_RECORDTYPE = createPropertiesRecordType();
- FEED_ACTIVITY_DETAILS_RECORDTYPE = createPropertiesRecordType();
- FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
+ PRIMARY_FEED_DETAILS_RECORDTYPE = createPrimaryFeedDetailsRecordType();
+ SECONDARY_FEED_DETAILS_RECORDTYPE = createSecondaryFeedDetailsRecordType();
+ FEED_RECORDTYPE = createFeedRecordType();
FEED_POLICY_RECORDTYPE = createFeedPolicyRecordType();
LIBRARY_RECORDTYPE = createLibraryRecordType();
@@ -498,50 +500,67 @@ public final class MetadataRecordTypes {
public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 4;
public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5;
public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
-
- private static ARecordType createFeedActivityRecordType() throws AsterixException {
- AUnorderedListType unorderedPropertyListType = new AUnorderedListType(FEED_ACTIVITY_DETAILS_RECORDTYPE, null);
- String[] fieldNames = { "DataverseName", "FeedName", "DatasetName", "ActivityId", "ActivityType", "Details",
- "Timestamp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
- BuiltinType.ASTRING, unorderedPropertyListType, BuiltinType.ASTRING };
- try {
- return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- }
+
public static final int FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
public static final int FEED_ARECORD_FEED_NAME_FIELD_INDEX = 1;
- public static final int FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX = 2;
- public static final int FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 3;
- public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 4;
- public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 5;
+ public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 2;
+ public static final int FEED_ARECORD_FEED_TYPE_FIELD_INDEX = 3;
+ public static final int FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX = 4;
+ public static final int FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX = 5;
+ public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
- private static ARecordType createFeedRecordType() throws AsterixException {
+
+ public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX = 0;
+ public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX = 1;
- AUnorderedListType unorderedAdapterPropertyListType = new AUnorderedListType(
- DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
+ public static final int FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX = 0;
+
+ private static ARecordType createFeedRecordType() throws AsterixException, HyracksDataException {
List<IAType> feedFunctionUnionList = new ArrayList<IAType>();
feedFunctionUnionList.add(BuiltinType.ANULL);
feedFunctionUnionList.add(BuiltinType.ASTRING);
AUnionType feedFunctionUnion = new AUnionType(feedFunctionUnionList, null);
- String[] fieldNames = { "DataverseName", "FeedName", "AdapterName", "AdapterConfiguration", "Function",
- "Timestamp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- unorderedAdapterPropertyListType, feedFunctionUnion, BuiltinType.ASTRING };
+ List<IAType> primaryFeedTypeDetailsRecordUnionList = new ArrayList<IAType>();
+ primaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL);
+ primaryFeedTypeDetailsRecordUnionList.add(PRIMARY_FEED_DETAILS_RECORDTYPE);
+ AUnionType primaryRecordUnion = new AUnionType(primaryFeedTypeDetailsRecordUnionList, null);
- try {
- return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
+ List<IAType> secondaryFeedTypeDetailsRecordUnionList = new ArrayList<IAType>();
+ secondaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL);
+ secondaryFeedTypeDetailsRecordUnionList.add(SECONDARY_FEED_DETAILS_RECORDTYPE);
+ AUnionType secondaryRecordUnion = new AUnionType(secondaryFeedTypeDetailsRecordUnionList, null);
+
+ String[] fieldNames = { "DataverseName", "FeedName", "Function", "FeedType", "PrimaryTypeDetails",
+ "SecondaryTypeDetails", "Timestamp" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, feedFunctionUnion, BuiltinType.ASTRING,
+ primaryRecordUnion, secondaryRecordUnion, BuiltinType.ASTRING };
+ return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true);
}
+ public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0;
+ public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1;
+
+ private static final ARecordType createPrimaryFeedDetailsRecordType() throws AsterixException, HyracksDataException {
+ AUnorderedListType unorderedAdaptorPropertyListType = new AUnorderedListType(
+ DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
+
+ String[] fieldNames = { "AdapterName", "AdapterConfiguration" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, unorderedAdaptorPropertyListType };
+ return new ARecordType(null, fieldNames, fieldTypes, true);
+ }
+
+ public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0;
+
+ private static final ARecordType createSecondaryFeedDetailsRecordType() throws AsterixException, HyracksDataException {
+ String[] fieldNames = { "SourceFeedName" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING };
+ return new ARecordType(null, fieldNames, fieldTypes, true);
+ }
+
public static final int LIBRARY_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
public static final int LIBRARY_ARECORD_NAME_FIELD_INDEX = 1;
public static final int LIBRARY_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
index 6948dbc..037b04d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -16,8 +16,8 @@ package edu.uci.ics.asterix.metadata.cluster;
import java.util.concurrent.atomic.AtomicInteger;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
public abstract class AbstractClusterManagementWork implements IClusterManagementWork {
@@ -35,6 +35,8 @@ public abstract class AbstractClusterManagementWork implements IClusterManagemen
this.workId = WorkIdGenerator.getNextWorkId();
}
+
+
private static class WorkIdGenerator {
private static AtomicInteger workId = new AtomicInteger(0);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
index 68dcc4c..1157562 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -1,27 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.cluster;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
public class AddNodeWork extends AbstractClusterManagementWork {
- private final int numberOfNodes;
+ private final int numberOfNodesRequested;
+ private final Set<String> deadNodes;
@Override
public WorkType getClusterManagementWorkType() {
return WorkType.ADD_NODE;
}
- public AddNodeWork(int numberOfNodes, IClusterEventsSubscriber subscriber) {
+ public AddNodeWork(Set<String> deadNodes, int numberOfNodesRequested, IClusterEventsSubscriber subscriber) {
super(subscriber);
- this.numberOfNodes = numberOfNodes;
+ this.deadNodes = deadNodes;
+ this.numberOfNodesRequested = numberOfNodesRequested;
+ }
+
+ public int getNumberOfNodesRequested() {
+ return numberOfNodesRequested;
}
- public int getNumberOfNodes() {
- return numberOfNodes;
+ public Set<String> getDeadNodes() {
+ return deadNodes;
}
@Override
public String toString() {
- return WorkType.ADD_NODE + " " + numberOfNodes + " requested by " + subscriber;
+ return WorkType.ADD_NODE + " " + numberOfNodesRequested + " requested by " + subscriber;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
index d578a77..7f3b575 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -1,6 +1,7 @@
package edu.uci.ics.asterix.metadata.cluster;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
public class ClusterManagementWorkResponse implements IClusterManagementWorkResponse {
@@ -13,6 +14,7 @@ public class ClusterManagementWorkResponse implements IClusterManagementWorkResp
this.status = Status.IN_PROGRESS;
}
+
@Override
public IClusterManagementWork getWork() {
return work;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index fe7f4a4..47ca953 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.management.AsterixEventServiceClient;
@@ -39,7 +40,6 @@ import edu.uci.ics.asterix.event.service.ILookupService;
import edu.uci.ics.asterix.event.service.ServiceProvider;
import edu.uci.ics.asterix.event.util.PatternCreator;
import edu.uci.ics.asterix.installer.schema.conf.Configuration;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManager;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
deleted file mode 100644
index dfc88ac..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package edu.uci.ics.asterix.metadata.cluster;
-
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-
-public interface IClusterManagementWorkResponse {
-
- public enum Status {
- IN_PROGRESS,
- SUCCESS,
- FAILURE
- }
-
- /**
- * @return
- */
- public IClusterManagementWork getWork();
-
- /**
- * @return
- */
- public Status getStatus();
-
- /**
- * @param status
- */
- public void setStatus(Status status);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
index 90683d1..8b4f2aa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -2,7 +2,7 @@ package edu.uci.ics.asterix.metadata.cluster;
import java.util.Set;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
public class RemoveNodeWork extends AbstractClusterManagementWork {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index ef23135..5df2c95 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -43,8 +43,7 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartition
public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
private final AqlSourceId id;
- private final String datasourceDataverse;
- private final String datasourceName;
+ private final IAType itemType;
private final AqlDataSourceType datasourceType;
protected IAType[] schemaTypes;
protected INodeDomain domain;
@@ -58,19 +57,18 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
}
public AqlDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName,
- AqlDataSourceType datasourceType) throws AlgebricksException {
+ IAType itemType, AqlDataSourceType datasourceType) throws AlgebricksException {
this.id = id;
- this.datasourceDataverse = datasourceDataverse;
- this.datasourceName = datasourceName;
+ this.itemType = itemType;
this.datasourceType = datasourceType;
}
public String getDatasourceDataverse() {
- return datasourceDataverse;
+ return id.getDataverseName();
}
public String getDatasourceName() {
- return datasourceName;
+ return id.getDatasourceName();
}
@Override
@@ -196,7 +194,10 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
public Map<String, Serializable> getProperties() {
return properties;
}
-
+
+ public IAType getItemType() {
+ return itemType;
+ }
public void setProperties(Map<String, Serializable> properties) {
this.properties = properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 5055cea..0ec098d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -18,6 +18,7 @@ package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -37,7 +38,12 @@ import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOp
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
@@ -65,23 +71,19 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.external.IndexingConstants;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
-import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -187,6 +189,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private boolean asyncResults;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+ private final ICentralFeedManager centralFeedManager;
private final Dataverse defaultDataverse;
private JobId jobId;
@@ -213,10 +216,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return config;
}
- public AqlMetadataProvider(Dataverse defaultDataverse) {
+ public AqlMetadataProvider(Dataverse defaultDataverse, ICentralFeedManager centralFeedManager) {
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ this.centralFeedManager = centralFeedManager;
}
public void setJobId(JobId jobId) {
@@ -330,10 +334,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
try {
switch (((AqlDataSource) dataSource).getDatasourceType()) {
- case FEED: {
- // loading data from a feed
- return buildFeedIntakeRuntime(jobSpec, dataSource);
- }
+ case FEED:
+ return buildFeedCollectRuntime(jobSpec, dataSource);
case INTERNAL_DATASET: {
// querying an internal dataset
return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
@@ -376,6 +378,110 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
}
+@SuppressWarnings("rawtypes")
+public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedCollectRuntime(JobSpecification jobSpec,
+ IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+
+ FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+ FeedCollectOperatorDescriptor feedCollector = null;
+
+ try {
+ ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType();
+ ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+ .getSerializerDeserializer(feedOutputType);
+ RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+ FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+ BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ if (feedPolicy == null) {
+ throw new AlgebricksException("Feed not configured with a policy");
+ }
+ feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+ FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
+ feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
+ feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
+ feedDataSource.getSourceFeedId(), (ARecordType) feedOutputType, feedDesc,
+ feedPolicy.getProperties(), feedDataSource.getLocation());
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedCollector,
+ determineLocationConstraint(feedDataSource));
+
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+}
+
+private AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
+ throws AsterixException {
+ String[] locationArray = null;
+ String locations = null;;
+ switch (feedDataSource.getSourceFeedType()) {
+ case PRIMARY:
+ switch (feedDataSource.getLocation()) {
+ case SOURCE_FEED_COMPUTE_STAGE:
+ if (feedDataSource.getFeed().getFeedId().equals(feedDataSource.getSourceFeedId())) {
+ locationArray = feedDataSource.getLocations();
+ } else {
+ Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager()
+ .getFeedActivities();
+ Iterator<FeedActivity> it = activities.iterator();
+ FeedActivity activity = null;
+ while (it.hasNext()) {
+ activity = it.next();
+ if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
+ && activity.getFeedName()
+ .equals(feedDataSource.getSourceFeedId().getFeedName())) {
+ locations = activity.getFeedActivityDetails().get(
+ FeedActivityDetails.COMPUTE_LOCATIONS);
+ locationArray = locations.split(",");
+ break;
+ }
+ }
+ }
+ break;
+ case SOURCE_FEED_INTAKE_STAGE:
+ locationArray = feedDataSource.getLocations();
+ break;
+ }
+ break;
+ case SECONDARY:
+ Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager().getFeedActivities();
+ Iterator<FeedActivity> it = activities.iterator();
+ FeedActivity activity = null;
+ while (it.hasNext()) {
+ activity = it.next();
+ if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
+ && activity.getFeedName().equals(feedDataSource.getSourceFeedId().getFeedName())) {
+ switch (feedDataSource.getLocation()) {
+ case SOURCE_FEED_INTAKE_STAGE:
+ locations = activity.getFeedActivityDetails()
+ .get(FeedActivityDetails.COLLECT_LOCATIONS);
+ break;
+ case SOURCE_FEED_COMPUTE_STAGE:
+ locations = activity.getFeedActivityDetails()
+ .get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ break;
+ }
+ break;
+ }
+ }
+
+ if (locations != null) {
+ locationArray = locations.split(",");
+ } else {
+ String message = "Unable to discover location(s) for source feed data hand-off "
+ + feedDataSource.getSourceFeedId();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(message);
+ }
+ throw new AsterixException(message);
+ }
+ break;
+ }
+ AlgebricksAbsolutePartitionConstraint locationConstraint = new AlgebricksAbsolutePartitionConstraint(
+ locationArray);
+ return locationConstraint;
+}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
@@ -411,7 +517,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
JobGenContext context, Object implConfig) throws AlgebricksException, MetadataException {
AqlSourceId asid = dataSource.getId();
String dataverseName = asid.getDataverseName();
- String datasetName = asid.getDatasetName();
+ String datasetName = asid.getDatasourceName();
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
int[] minFilterFieldIndexes = null;
@@ -439,8 +545,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
- Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated,
- List<List<String>> primaryKeys) throws AlgebricksException {
+ Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated, List<List<String>> primaryKeys)
+ throws AlgebricksException {
IAdapterFactory adapterFactory;
DatasourceAdapter adapterEntity;
String adapterFactoryClassname;
@@ -458,6 +564,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
+ adapterFactory.configure(configuration, (ARecordType) itemType);
+
// check to see if dataset is indexed
Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(),
@@ -472,18 +580,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
iterator.remove();
}
}
- ((IGenericAdapterFactory) adapterFactory).setFiles(files);
+ // TODO Check this call, result of merge from master!
+ // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
}
-
- switch (adapterFactory.getAdapterType()) {
- case GENERIC:
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType);
- break;
- case TYPED:
- ((ITypedAdapterFactory) adapterFactory).configure(configuration);
- break;
- }
- return adapterFactory;
+
+ return adapterFactory;
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter " + e);
}
@@ -539,78 +640,30 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
}
- @SuppressWarnings("rawtypes")
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
- IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-
- FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> buildFeedIntakeRuntime(
+ JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
+ Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+ factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
+ IFeedAdapterFactory adapterFactory = factoryOutput.first;
FeedIntakeOperatorDescriptor feedIngestor = null;
- Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
- AlgebricksPartitionConstraint constraint = null;
-
- try {
- factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
- IAdapterFactory adapterFactory = factoryOutput.first;
- ARecordType adapterOutputType = factoryOutput.second;
- AdapterType adapterType = factoryOutput.third;
-
- ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
- .getSerializerDeserializer(adapterOutputType);
- RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
- FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
- BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
- if (feedPolicy == null) {
- throw new AlgebricksException("Feed not configured with a policy");
- }
- feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
- switch (adapterType) {
- case INTERNAL:
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
- feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
- feedDesc, feedPolicy.getProperties());
- break;
- case EXTERNAL:
- String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0];
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
- libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
- .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
- break;
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
- }
- constraint = factoryOutput.first.getPartitionConstraint();
- } catch (Exception e) {
- throw new AlgebricksException(e);
+ switch (factoryOutput.third) {
+ case INTERNAL:
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory,
+ factoryOutput.second, policyAccessor);
+ break;
+ case EXTERNAL:
+ String libraryName = primaryFeed.getAdaptorName().trim()
+ .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, adapterFactory
+ .getClass().getName(), factoryOutput.second, policyAccessor);
+ break;
}
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
- JobSpecification jobSpec, String dataverse, String feedName, String dataset, IFeedMessage feedMessage,
- String[] locations) throws AlgebricksException {
- AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations);
- FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, feedName,
- dataset, feedMessage);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
- }
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
- JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity)
- throws AlgebricksException {
- List<String> feedLocations = new ArrayList<String>();
- String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS)
- .split(",");
- for (String loc : ingestLocs) {
- feedLocations.add(loc);
- }
- FeedConnectionId feedId = new FeedConnectionId(dataverse, feedName, dataset);
- String[] locations = feedLocations.toArray(new String[] {});
- IFeedMessage feedMessage = new EndFeedMessage(feedId);
- return buildSendFeedMessageRuntime(jobSpec, dataverse, feedName, dataset, feedMessage, locations);
+ AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
+ return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory>(feedIngestor,
+ partitionConstraint, adapterFactory);
}
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
@@ -934,7 +987,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
- Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasetName());
+ Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
if (dataset == null) {
throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
}
@@ -942,7 +995,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, aqlId.getDataverseName(), tName).getDatatype();
AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET
: AqlDataSourceType.INTERNAL_DATASET;
- return new DatasetDataSource(aqlId, aqlId.getDataverseName(), aqlId.getDatasetName(), itemType, datasourceType);
+ return new DatasetDataSource(aqlId, aqlId.getDataverseName(), aqlId.getDatasourceName(), itemType, datasourceType);
}
@Override
@@ -971,7 +1024,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
String dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasetName();
+ String datasetName = dataSource.getId().getDatasourceName();
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
@@ -1055,7 +1108,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
+ String datasetName = dataSource.getId().getDatasourceName();
Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse "
@@ -1173,7 +1226,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+ String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
@@ -1234,7 +1287,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
String indexName = dataSourceIndex.getId();
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+ String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
IOperatorSchema inputSchema = new OperatorSchemaImpl();
if (inputSchemas.length > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
index 4cee35d..be9f0e2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
@@ -20,23 +20,23 @@ import java.io.File;
public class AqlSourceId {
private String dataverseName;
- private String datasetName;
+ private String datasourceName;
- public AqlSourceId(String dataverseName, String datasetName) {
+ public AqlSourceId(String dataverseName, String datasourceName) {
this.dataverseName = dataverseName;
- this.datasetName = datasetName;
+ this.datasourceName = datasourceName;
}
@Override
public String toString() {
- return dataverseName + File.pathSeparator + datasetName;
+ return dataverseName + File.pathSeparator + datasourceName;
}
public String getDataverseName() {
return dataverseName;
}
- public String getDatasetName() {
- return datasetName;
+ public String getDatasourceName() {
+ return datasourceName;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
index 059a10c..fd06e99 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
@@ -21,7 +21,7 @@ public class DatasetDataSource extends AqlDataSource {
public DatasetDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName, IAType itemType,
AqlDataSourceType datasourceType) throws AlgebricksException {
- super(id, datasourceDataverse, datasourceName, datasourceType);
+ super(id, datasourceDataverse, datasourceName, itemType, datasourceType);
MetadataTransactionContext ctx = null;
try {
ctx = MetadataManager.INSTANCE.beginTransaction();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
index 695ae31..44402fc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
@@ -14,32 +14,42 @@
*/
package edu.uci.ics.asterix.metadata.declared;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
public class FeedDataSource extends AqlDataSource {
private Feed feed;
- private final FeedConnectionId feedConnectionId;
+ private final FeedId sourceFeedId;
+ private final FeedType sourceFeedType;
+ private final ConnectionLocation location;
+ private final String targetDataset;
+ private final String[] locations;
+ private final int computeCardinality;
- public FeedDataSource(AqlSourceId id, FeedConnectionId feedId, IAType itemType, AqlDataSourceType dataSourceType)
+ public FeedDataSource(AqlSourceId id, String targetDataset, IAType itemType, AqlDataSourceType dataSourceType,
+ FeedId sourceFeedId, FeedType sourceFeedType, ConnectionLocation location, String[] locations)
throws AlgebricksException {
- super(id, feedId.getDataverse(), feedId.getFeedName(), dataSourceType);
- this.feedConnectionId = feedId;
- feed = null;
+ super(id, id.getDataverseName(), id.getDatasourceName(), itemType, dataSourceType);
+ this.targetDataset = targetDataset;
+ this.sourceFeedId = sourceFeedId;
+ this.sourceFeedType = sourceFeedType;
+ this.location = location;
+ this.locations = locations;
+ this.computeCardinality = AsterixClusterProperties.INSTANCE.getParticipantNodes().size();
MetadataTransactionContext ctx = null;
try {
MetadataManager.INSTANCE.acquireReadLatch();
ctx = MetadataManager.INSTANCE.beginTransaction();
- feed = MetadataManager.INSTANCE.getFeed(ctx, feedId.getDataverse(), feedId.getFeedName());
- if (feed == null) {
- throw new AlgebricksException("Unknown feed " + feedId);
- }
+ this.feed = MetadataManager.INSTANCE.getFeed(ctx, id.getDataverseName(), id.getDatasourceName());
MetadataManager.INSTANCE.commitTransaction(ctx);
initFeedDataSource(itemType);
} catch (Exception e) {
@@ -71,8 +81,20 @@ public class FeedDataSource extends AqlDataSource {
return domain;
}
- public FeedConnectionId getFeedConnectionId() {
- return feedConnectionId;
+ public String getTargetDataset() {
+ return targetDataset;
+ }
+
+ public FeedId getSourceFeedId() {
+ return sourceFeedId;
+ }
+
+ public ConnectionLocation getLocation() {
+ return location;
+ }
+
+ public String[] getLocations() {
+ return locations;
}
private void initFeedDataSource(IAType itemType) {
@@ -91,4 +113,12 @@ public class FeedDataSource extends AqlDataSource {
};
domain = domainForExternalData;
}
+
+ public FeedType getSourceFeedType() {
+ return sourceFeedType;
+ }
+
+ public int getComputeCardinality() {
+ return computeCardinality;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
index 5701292..f4be491 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
@@ -4,8 +4,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
index cbf771b..fd4b6a7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
@@ -1,7 +1,10 @@
package edu.uci.ics.asterix.metadata.declared;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -40,11 +43,7 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory {
return "FieldExtractingAdapter[ " + wrappedAdapterFactory.getName() + " ]";
}
- @Override
- public AdapterType getAdapterType() {
- return wrappedAdapterFactory.getAdapterType();
- }
-
+
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return wrappedAdapterFactory.getPartitionConstraint();
@@ -55,5 +54,15 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory {
IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
return new FieldExtractingAdapter(ctx, inRecDesc, outRecDesc, extractFields, rType, wrappedAdapter);
}
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ wrappedAdapterFactory.configure(configuration, outputType);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return wrappedAdapterFactory.getAdapterOutputType();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
index 2dc840e..c91e3f6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
@@ -51,7 +51,7 @@ public class LoadableDataSource extends AqlDataSource {
public LoadableDataSource(Dataset targetDataset, IAType itemType, String adapter, Map<String, String> properties)
throws AlgebricksException, IOException {
- super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source",
+ super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source", itemType,
AqlDataSourceType.LOADABLE);
this.targetDataset = targetDataset;
this.adapter = adapter;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
index 0938ccc..862092a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
@@ -18,7 +18,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.base.AMutableUUID;
import edu.uci.ics.asterix.om.base.AUUID;
import edu.uci.ics.asterix.om.pointables.ARecordPointable;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
index e371b2b..0a36ea8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
@@ -14,8 +14,10 @@
*/
package edu.uci.ics.asterix.metadata.declared;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -58,11 +60,6 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory {
}
@Override
- public AdapterType getAdapterType() {
- return wrappedAdapterFactory.getAdapterType();
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return wrappedAdapterFactory.getPartitionConstraint();
}
@@ -72,4 +69,14 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory {
IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
return new PKGeneratingAdapter(ctx, inRecDesc, outRecDesc, inRecType, outRecType, wrappedAdapter, pkIndex);
}
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ wrappedAdapterFactory.configure(configuration, outputType);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return wrappedAdapterFactory.getAdapterOutputType();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
index 39aa8ab..53b75a6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
@@ -15,62 +15,67 @@
package edu.uci.ics.asterix.metadata.entities;
-import java.util.Map;
-
+import edu.uci.ics.asterix.common.feeds.FeedId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.MetadataCache;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
/**
- * Metadata describing a feed.
+ * Feed POJO
*/
public class Feed implements IMetadataEntity {
private static final long serialVersionUID = 1L;
- private final String dataverseName;
- private final String feedName;
- private final String adapterName;
- private final Map<String, String> adapterConfiguration;
- private final FunctionSignature appliedFunction;
-
- public Feed(String dataverseName, String datasetName, String adapterName, Map<String, String> adapterConfiguration,
- FunctionSignature appliedFunction) {
- this.dataverseName = dataverseName;
- this.feedName = datasetName;
- this.adapterName = adapterName;
- this.adapterConfiguration = adapterConfiguration;
- this.appliedFunction = appliedFunction;
+ /** A unique identifier for the feed */
+ protected final FeedId feedId;
+
+ /** The function that is to be applied on each incoming feed tuple **/
+ protected final FunctionSignature appliedFunction;
+
+ /** The type {@code FeedType} associated with the feed. **/
+ protected final FeedType feedType;
+
+ /** A string representation of the instance **/
+ protected final String displayName;
+
+ public enum FeedType {
+ /**
+ * A feed that derives its data from an external source.
+ */
+ PRIMARY,
+
+ /**
+ * A feed that derives its data from another primary or secondary feed.
+ */
+ SECONDARY
}
- public String getDataverseName() {
- return dataverseName;
+ public Feed(String dataverseName, String datasetName, FunctionSignature appliedFunction, FeedType feedType) {
+ this.feedId = new FeedId(dataverseName, datasetName);
+ this.appliedFunction = appliedFunction;
+ this.feedType = feedType;
+ this.displayName = feedType + "(" + feedId + ")";
}
- public String getFeedName() {
- return feedName;
+ public FeedId getFeedId() {
+ return feedId;
}
- public String getAdapterName() {
- return adapterName;
+ public String getDataverseName() {
+ return feedId.getDataverse();
}
- public Map<String, String> getAdapterConfiguration() {
- return adapterConfiguration;
+ public String getFeedName() {
+ return feedId.getFeedName();
}
public FunctionSignature getAppliedFunction() {
return appliedFunction;
}
- @Override
- public Object addToCache(MetadataCache cache) {
- return cache.addFeedIfNotExists(this);
- }
-
- @Override
- public Object dropFromCache(MetadataCache cache) {
- return cache.dropFeed(this);
+ public FeedType getFeedType() {
+ return feedType;
}
@Override
@@ -81,13 +86,27 @@ public class Feed implements IMetadataEntity {
if (!(other instanceof Feed)) {
return false;
}
- Feed otherDataset = (Feed) other;
- if (!otherDataset.dataverseName.equals(dataverseName)) {
- return false;
- }
- if (!otherDataset.feedName.equals(feedName)) {
- return false;
- }
- return true;
+ Feed otherFeed = (Feed) other;
+ return otherFeed.getFeedId().equals(feedId);
+ }
+
+ @Override
+ public int hashCode() {
+ return displayName.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return feedType + "(" + feedId + ")";
+ }
+
+ @Override
+ public Object addToCache(MetadataCache cache) {
+ return cache.addFeedIfNotExists(this);
+ }
+
+ @Override
+ public Object dropFromCache(MetadataCache cache) {
+ return cache.dropFeed(this);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
deleted file mode 100644
index 679276f..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.metadata.entities;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.metadata.MetadataCache;
-import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-
-/**
- * Metadata describing a feed activity record.
- */
-public class FeedActivity implements IMetadataEntity, Comparable<FeedActivity> {
-
- private static final long serialVersionUID = 1L;
-
- private int activityId;
-
- private final String dataverseName;
- private final String datasetName;
- private final String feedName;
-
- private String lastUpdatedTimestamp;
- private FeedActivityType activityType;
- private Map<String, String> feedActivityDetails;
-
- public static enum FeedActivityType {
- FEED_BEGIN,
- FEED_FAILURE,
- FEED_END
- }
-
- public static class FeedActivityDetails {
- public static final String COMPUTE_LOCATIONS = "compute-locations";
- public static final String INGEST_LOCATIONS = "ingest-locations";
- public static final String STORAGE_LOCATIONS = "storage-locations";
- public static final String TOTAL_INGESTED = "total-ingested";
- public static final String INGESTION_RATE = "ingestion-rate";
- public static final String EXCEPTION_LOCATION = "exception-location";
- public static final String EXCEPTION_MESSAGE = "exception-message";
- public static final String FEED_POLICY_NAME = "feed-policy-name";
- public static final String SUPER_FEED_MANAGER_HOST = "super-feed-manager-host";
- public static final String SUPER_FEED_MANAGER_PORT = "super-feed-manager-port";
- public static final String FEED_NODE_FAILURE = "feed-node-failure";
-
- }
-
- public FeedActivity(String dataverseName, String feedName, String datasetName, FeedActivityType feedActivityType,
- Map<String, String> feedActivityDetails) {
- this.dataverseName = dataverseName;
- this.feedName = feedName;
- this.datasetName = datasetName;
- this.activityType = feedActivityType;
- this.feedActivityDetails = feedActivityDetails;
- }
-
- public String getDataverseName() {
- return dataverseName;
- }
-
- public String getDatasetName() {
- return datasetName;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public Object addToCache(MetadataCache cache) {
- return cache.addFeedActivityIfNotExists(this);
- }
-
- @Override
- public Object dropFromCache(MetadataCache cache) {
- return cache.dropFeedActivity(this);
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof FeedActivity)) {
- return false;
- }
-
- if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
- return false;
- }
- if (!((FeedActivity) other).datasetName.equals(datasetName)) {
- return false;
- }
- if (!((FeedActivity) other).getFeedName().equals(feedName)) {
- return false;
- }
- if (!((FeedActivity) other).getFeedActivityType().equals(activityType)) {
- return false;
- }
- if (((FeedActivity) other).getActivityId() != (activityId)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return dataverseName + "." + feedName + " --> " + datasetName + " " + activityType + " " + activityId;
- }
-
- public FeedActivityType getFeedActivityType() {
- return activityType;
- }
-
- public void setFeedActivityType(FeedActivityType feedActivityType) {
- this.activityType = feedActivityType;
- }
-
- public String getLastUpdatedTimestamp() {
- return lastUpdatedTimestamp;
- }
-
- public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) {
- this.lastUpdatedTimestamp = lastUpdatedTimestamp;
- }
-
- public int getActivityId() {
- return activityId;
- }
-
- public void setActivityId(int activityId) {
- this.activityId = activityId;
- }
-
- public Map<String, String> getFeedActivityDetails() {
- return feedActivityDetails;
- }
-
- public void setFeedActivityDetails(Map<String, String> feedActivityDetails) {
- this.feedActivityDetails = feedActivityDetails;
- }
-
- public FeedActivityType getActivityType() {
- return activityType;
- }
-
- public void setActivityType(FeedActivityType activityType) {
- this.activityType = activityType;
- }
-
- @Override
- public int compareTo(FeedActivity o) {
- return o.getActivityId() - this.activityId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
new file mode 100644
index 0000000..62f0d07
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+/**
+ * A primary feed is one that derives its data from an external source via an adaptor.
+ * This class is a holder object for the metadata associated with a primary feed.
+ */
+public class PrimaryFeed extends Feed implements IMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String adaptorName;
+ private final Map<String, String> adaptorConfiguration;
+
+ public PrimaryFeed(String dataverseName, String datasetName, String adaptorName,
+ Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction) {
+ super(dataverseName, datasetName, appliedFunction, FeedType.PRIMARY);
+ this.adaptorName = adaptorName;
+ this.adaptorConfiguration = adaptorConfiguration;
+ }
+
+ public String getAdaptorName() {
+ return adaptorName;
+ }
+
+ public Map<String, String> getAdaptorConfiguration() {
+ return adaptorConfiguration;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!super.equals(other) || !(other instanceof PrimaryFeed)) {
+ return false;
+ }
+
+ PrimaryFeed otherFeed = (PrimaryFeed) other;
+ if (!otherFeed.getAdaptorName().equals(adaptorName)) {
+ return false;
+ }
+
+ for (Entry<String, String> entry : adaptorConfiguration.entrySet()) {
+ if (!(entry.getValue().equals(otherFeed.getAdaptorConfiguration().get(entry.getKey())))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PrimaryFeed (" + adaptorName + ")";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
new file mode 100644
index 0000000..c1f51ba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+/**
+ * A secondary feed is one that derives its data from another (primary/secondary) feed.
+ * This class is a holder object for the metadata associated with a secondary feed.
+ */
+public class SecondaryFeed extends Feed implements IMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String sourceFeedName;
+
+ public SecondaryFeed(String dataverseName, String feedName, String sourceFeedName, FunctionSignature appliedFunction) {
+ super(dataverseName, feedName, appliedFunction, FeedType.SECONDARY);
+ this.sourceFeedName = sourceFeedName;
+ }
+
+ public String getSourceFeedName() {
+ return sourceFeedName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!super.equals(other) || !(other instanceof SecondaryFeed)) {
+ return false;
+ }
+
+ SecondaryFeed otherFeed = (SecondaryFeed) other;
+ if (!otherFeed.getSourceFeedName().equals(sourceFeedName)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "SecondaryFeed (" + feedId + ")" + "<--" + "(" + sourceFeedName + ")";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 1bb34d2..580881d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -85,7 +85,7 @@ public class DatasourceAdapterTupleTranslator extends AbstractTupleTranslator<Da
aString.setValue(adapter.getAdapterIdentifier().getNamespace());
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
- aString.setValue(adapter.getAdapterIdentifier().getAdapterName());
+ aString.setValue(adapter.getAdapterIdentifier().getName());
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
@@ -101,7 +101,7 @@ public class DatasourceAdapterTupleTranslator extends AbstractTupleTranslator<Da
// write field 1
fieldValue.reset();
- aString.setValue(adapter.getAdapterIdentifier().getAdapterName());
+ aString.setValue(adapter.getAdapterIdentifier().getName());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_NAME_FIELD_INDEX, fieldValue);