You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:16 UTC
[22/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 44af0ff..d67ca0d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor;
import org.apache.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -46,7 +47,6 @@ import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index 4d887dc..418d143 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -31,12 +31,12 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index c6e04df..4fae7e9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -26,15 +26,15 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IClusterEventsSubscriber;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.feeds.CentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
import org.apache.asterix.file.ExternalIndexingOperations;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index d2164f4..2a7b3e4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -37,12 +37,12 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.feeds.api.ICentralFeedManager;
import org.apache.asterix.compiler.provider.AqlCompilationProvider;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.event.service.ILookupService;
-import org.apache.asterix.feeds.CentralFeedManager;
-import org.apache.asterix.feeds.FeedLifecycleListener;
+import org.apache.asterix.external.feed.api.ICentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.feed.FeedLifecycleListener;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index 01775ab..b0dfd58 100755
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -36,6 +36,8 @@ import javax.xml.bind.Unmarshaller;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.external.library.ExternalLibrary;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.external.library.LibraryAdapter;
@@ -44,10 +46,8 @@ import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.api.IMetadataEntity;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Library;
-import org.apache.asterix.metadata.feeds.AdapterIdentifier;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
public class ExternalLibraryBootstrap {
@@ -210,7 +210,8 @@ public class ExternalLibraryBootstrap {
String adapterFactoryClass = adapter.getFactoryClass().trim();
String adapterName = libraryName + "#" + adapter.getName().trim();
AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
- DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
+ DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass,
+ IDataSourceAdapter.AdapterType.EXTERNAL);
MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Installed adapter: " + adapterName);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
index 2d443c7..d5f1a51 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/FeedBootstrap.java
@@ -18,29 +18,23 @@
*/
package org.apache.asterix.hyracks.bootstrap;
-import org.apache.asterix.feeds.CentralFeedManager;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.feed.CentralFeedManager;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
public class FeedBootstrap {
- public final static String FEEDS_METADATA_DV = "feeds_metadata";
- public final static String FAILED_TUPLE_DATASET = "failed_tuple";
- public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
- public final static String FAILED_TUPLE_DATASET_KEY = "id";
-
public static void setUpInitialArtifacts() throws Exception {
StringBuilder builder = new StringBuilder();
try {
- builder.append("create dataverse " + FEEDS_METADATA_DV + ";" + "\n");
- builder.append("use dataverse " + FEEDS_METADATA_DV + ";" + "\n");
-
- builder.append("create type " + FAILED_TUPLE_DATASET_TYPE + " as open { ");
-
- String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple",
- "message", "timestamp" };
+ builder.append("create dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
+ builder.append("use dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
+ builder.append("create type " + FeedConstants.FAILED_TUPLE_DATASET_TYPE + " as open { ");
+ String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+ "timestamp" };
IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
@@ -52,9 +46,9 @@ public class FeedBootstrap {
builder.append(fieldTypes[i].getTypeName());
}
builder.append("}" + ";" + "\n");
-
- builder.append("create dataset " + FAILED_TUPLE_DATASET + " " + "(" + FAILED_TUPLE_DATASET_TYPE + ")" + " "
- + "primary key " + FAILED_TUPLE_DATASET_KEY + " on " + MetadataConstants.METADATA_NODEGROUP_NAME
+ builder.append("create dataset " + FeedConstants.FAILED_TUPLE_DATASET + " " + "("
+ + FeedConstants.FAILED_TUPLE_DATASET_TYPE + ")" + " " + "primary key "
+ + FeedConstants.FAILED_TUPLE_DATASET_KEY + " on " + MetadataConstants.METADATA_NODEGROUP_NAME
+ ";");
CentralFeedManager.AQLExecutor.executeAQL(builder.toString());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 76e7856..3341387 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.messaging.NCMessageBroker;
@@ -46,7 +47,6 @@ import org.apache.asterix.metadata.MetadataNode;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -114,7 +114,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
runtimeContext.initialize(initialRun);
ncApplicationContext.setApplicationObject(runtimeContext);
- //if replication is enabled, check if there is a replica for this node
+ //If replication is enabled, check if there is a replica for this node
AsterixReplicationProperties asterixReplicationProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getReplicationProperties();
@@ -123,7 +123,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
if (initialRun) {
LOGGER.info("System is being initialized. (first run)");
} else {
- // #. recover if the system is corrupted by checking system state.
+ //#. recover if the system is corrupted by checking system state.
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
systemState = recoveryMgr.getSystemState();
@@ -133,7 +133,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
if (replicationEnabled) {
if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
- //try to perform remote recovery
+ //Try to perform remote recovery
IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
remoteRecoveryMgr.performRemoteRecovery();
performedRemoteRecovery = true;
@@ -152,20 +152,20 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
}
private void startReplicationService() throws IOException {
- //open replication channel
+ //Open replication channel
runtimeContext.getReplicationChannel().start();
- //check the state of remote replicas
+ //Check the state of remote replicas
runtimeContext.getReplicationManager().initializeReplicasState();
if (performedRemoteRecovery) {
- //notify remote replicas about the new IP Address if changed
+ //Notify remote replicas about the new IP Address if changed
//Note: this is a hack since each node right now maintains its own copy of the cluster configuration.
//Once the configuration is centralized on the CC, this step wont be needed.
runtimeContext.getReplicationManager().broadcastNewIPAddress();
}
- //start replication after the state of remote replicas has been initialized.
+ //Start replication after the state of remote replicas has been initialized.
runtimeContext.getReplicationManager().startReplicationThreads();
}
@@ -182,7 +182,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
MetadataBootstrap.stopUniverse();
}
- //clean any temporary files
+ //Clean any temporary files
performLocalCleanUp();
//Note: stopping recovery manager will make a sharp checkpoint
@@ -197,7 +197,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
@Override
public void notifyStartupComplete() throws Exception {
- //send max resource id on this NC to the CC
+ //Send max resource id on this NC to the CC
((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
@@ -228,9 +228,9 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
throw new IllegalStateException("Metadata node cannot access distributed state");
}
- // This is a special case, we just give the metadataNode directly.
- // This way we can delay the registration of the metadataNode until
- // it is completely initialized.
+ //This is a special case, we just give the metadataNode directly.
+ //This way we can delay the registration of the metadataNode until
+ //it is completely initialized.
MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
@@ -272,26 +272,26 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
proxy.setMetadataNode(stub);
}
- //clean any temporary files
+ //Clean any temporary files
performLocalCleanUp();
}
private void performLocalCleanUp() {
- //delete working area files from failed jobs
+ //Delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
- //reclaim storage for temporary datasets.
+ //Reclaim storage for temporary datasets.
String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
for (String ioDevice : ioDevices) {
String tempDatasetsDir = ioDevice + storageDirName + File.separator
- + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
FileUtils.deleteQuietly(new File(tempDatasetsDir));
}
- // TODO
- //reclaim storage for orphaned index artifacts in NCs.
+ //TODO
+ //Reclaim storage for orphaned index artifacts in NCs.
//Note: currently LSM indexes invalid components are deleted when an index is activated.
}
@@ -321,7 +321,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
String[] ioDevicePaths = nodeIoDevices.trim().split(",");
for (int i = 0; i < ioDevicePaths.length; i++) {
- //construct full store path
+ // construct full store path
ioDevicePaths[i] += File.separator + storeDir;
}
metadataProperties.getStores().put(nodeId, ioDevicePaths);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
index 8ff6d9b..83beb7c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -36,7 +36,7 @@ import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.asterix.feeds.CentralFeedManager;
+import org.apache.asterix.feed.CentralFeedManager;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
index c4dde05..aabf05d 100644
--- a/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
+++ b/asterix-app/src/test/resources/metadata/results/basic/meta15/meta15.1.adm
@@ -1,3 +1 @@
-{ "DataverseName": "Metadata", "Name": "adapter", "Classname": "org.apache.asterix.external.adapter.factory.GenericAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
-{ "DataverseName": "Metadata", "Name": "socket_adapter", "Classname": "org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
-{ "DataverseName": "Metadata", "Name": "socket_client", "Classname": "org.apache.asterix.external.runtime.SocketClientAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 03 15:39:35 AST 2016" }
\ No newline at end of file
+{ "DataverseName": "Metadata", "Name": "adapter", "Classname": "org.apache.asterix.external.adapter.factory.GenericAdapterFactory", "Type": "INTERNAL", "Timestamp": "Sun Jan 10 16:13:18 AST 2016" }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index f981aca..f3e4605 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index e14b558..375d05a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -27,6 +27,82 @@
ResultOffsetPath="results"
QueryOffsetPath="queries"
QueryFileExtension=".aql">
+ <test-group name="feeds">
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_01">
+ <output-dir compare="Text">feeds_01</output-dir>
+ </compilation-unit>
+ </test-case>
+ <!--Disable it because of sporadic failures. Abdullah will re-enable it.
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_02">
+ <output-dir compare="Text">feeds_02</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_03">
+ <output-dir compare="Text">feeds_03</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_04">
+ <output-dir compare="Text">feeds_04</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_06">
+ <output-dir compare="Text">feeds_06</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_07">
+ <output-dir compare="Text">feeds_07</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_08">
+ <output-dir compare="Text">feeds_08</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_09">
+ <output-dir compare="Text">feeds_09</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_10">
+ <output-dir compare="Text">feeds_10</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_11">
+ <output-dir compare="Text">feeds_11</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_12">
+ <output-dir compare="Text">feeds_12</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="issue_230_feeds">
+ <output-dir compare="Text">issue_230_feeds</output-dir>
+ </compilation-unit>
+ </test-case>
+
+ <test-case FilePath="feeds">
+ <compilation-unit name="issue_711_feeds">
+ <output-dir compare="Text">issue_711_feeds</output-dir>
+ </compilation-unit>
+ </test-case>
+ -->
+
+ </test-group>
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
@@ -6137,83 +6213,6 @@
</compilation-unit>
</test-case>
</test-group>
- <test-group name="feeds">
-
- <!--Disable it because of sporadic failures. Raman will re-enable it.
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_01">
- <output-dir compare="Text">feeds_01</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_02">
- <output-dir compare="Text">feeds_02</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_03">
- <output-dir compare="Text">feeds_03</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_04">
- <output-dir compare="Text">feeds_04</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_06">
- <output-dir compare="Text">feeds_06</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_07">
- <output-dir compare="Text">feeds_07</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_08">
- <output-dir compare="Text">feeds_08</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_09">
- <output-dir compare="Text">feeds_09</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_10">
- <output-dir compare="Text">feeds_10</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_11">
- <output-dir compare="Text">feeds_11</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="feeds">
- <compilation-unit name="feeds_12">
- <output-dir compare="Text">feeds_12</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="issue_230_feeds">
- <output-dir compare="Text">issue_230_feeds</output-dir>
- </compilation-unit>
- </test-case>
-
- <test-case FilePath="feeds">
- <compilation-unit name="issue_711_feeds">
- <output-dir compare="Text">issue_711_feeds</output-dir>
- </compilation-unit>
- </test-case>
- -->
-
- </test-group>
<test-group name="hdfs">
<test-case FilePath="hdfs">
<compilation-unit name="large-record">
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index b8c3f2f..3386252 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -79,7 +78,7 @@ public interface IAsterixAppRuntimeContext {
public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
- public IFeedManager getFeedManager();
+ public Object getFeedManager();
public IRemoteRecoveryManager getRemoteRecoveryManager();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index cc7ec84..13ce403 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -197,7 +197,7 @@ public class AsterixPropertiesAccessor {
}
public ClusterPartition getMetadataPartiton() {
- //metadata partition is always the first partition on the metadata node
+ // metadata partition is always the first partition on the metadata node
return nodePartitionsMap.get(metadataNodeName)[0];
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java b/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
new file mode 100644
index 0000000..943e385
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.config;
+
+public class MetadataConstants {
+
+ // Name of the dataverse the metadata lives in.
+ public final static String METADATA_DATAVERSE_NAME = "Metadata";
+
+ // Name of the node group where metadata is stored on.
+ public final static String METADATA_NODEGROUP_NAME = "MetadataGroup";
+
+ // Name of the default nodegroup where internal/feed datasets will be partitioned
+ // if an explicit nodegroup is not specified at the time of creation of a dataset
+ public static final String METADATA_DEFAULT_NODEGROUP_NAME = "DEFAULT_NG_ALL_NODES";
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index fd1ebb8..d25e51f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.dataflow;
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.FrameDataException;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -41,6 +42,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
private final boolean isPrimary;
private AbstractLSMIndex lsmIndex;
+ private int i = 0;
public boolean isPrimary() {
return isPrimary;
@@ -85,7 +87,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
try {
- for (int i = 0; i < tupleCount; i++) {
+ for (; i < tupleCount; i++) {
if (tupleFilter != null) {
frameTuple.reset(accessor, i);
if (!tupleFilter.accept(frameTuple)) {
@@ -117,11 +119,13 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
}
}
} catch (Throwable th) {
- throw new HyracksDataException(th);
+ FrameDataException fde = new FrameDataException(i, th);
+ throw fde;
}
writeBuffer.ensureFrameSize(buffer.capacity());
FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+ i = 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
index 136a196..18b5264 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/exceptions/FrameDataException.java
@@ -26,7 +26,7 @@ public class FrameDataException extends HyracksDataException {
private final int tupleIndex;
- public FrameDataException(int tupleIndex, Exception cause) {
+ public FrameDataException(int tupleIndex, Throwable cause) {
super(cause);
this.tupleIndex = tupleIndex;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
deleted file mode 100644
index 70833fc..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/BasicMonitoredBuffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class BasicMonitoredBuffer extends MonitoredBuffer {
-
- public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
- RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
- super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
- exceptionHandler, callback, nPartitions, policyAccessor);
- }
-
- @Override
- protected boolean monitorProcessingRate() {
- return false;
- }
-
- @Override
- protected boolean logInflowOutflowRate() {
- return false;
- }
-
- @Override
- protected IFramePreprocessor getFramePreProcessor() {
- return null;
- }
-
- @Override
- protected IFramePostProcessor getFramePostProcessor() {
- return null;
- }
-
- @Override
- protected boolean monitorInputQueueLength() {
- return false;
- }
-
- @Override
- protected boolean reportInflowRate() {
- return false;
- }
-
- @Override
- protected boolean reportOutflowRate() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
deleted file mode 100644
index 9865501..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/CollectionRuntime.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * Represents the feed runtime that collects feed tuples from another feed.
- * In case of a primary feed, the CollectionRuntime collects tuples from the feed
- * intake job. For a secondary feed, tuples are collected from the intake/compute
- * runtime associated with the source feed.
- */
-public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
-
- private final FeedConnectionId connectionId;
- private final ISubscribableRuntime sourceRuntime;
- private final Map<String, String> feedPolicy;
- private FeedFrameCollector frameCollector;
-
- public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
- Map<String, String> feedPolicy) {
- super(runtimeId, inputSideHandler, outputSideWriter);
- this.connectionId = connectionId;
- this.sourceRuntime = sourceRuntime;
- this.feedPolicy = feedPolicy;
- }
-
- public State waitTillCollectionOver() throws InterruptedException {
- if (!(isCollectionOver())) {
- synchronized (frameCollector) {
- while (!isCollectionOver()) {
- frameCollector.wait();
- }
- }
- }
- return frameCollector.getState();
- }
-
- private boolean isCollectionOver() {
- return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
- || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
- }
-
- @Override
- public void setMode(Mode mode) {
- getInputHandler().setMode(mode);
- }
-
- @Override
- public Map<String, String> getFeedPolicy() {
- return feedPolicy;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public ISubscribableRuntime getSourceRuntime() {
- return sourceRuntime;
- }
-
- public void setFrameCollector(FeedFrameCollector frameCollector) {
- this.frameCollector = frameCollector;
- }
-
- @Override
- public FeedFrameCollector getFrameCollector() {
- return frameCollector;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
deleted file mode 100644
index 7ec3fdf..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/ComputeSideMonitoredBuffer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
-
- public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
- super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
- exceptionHandler, callback, nPartitions, policyAccessor);
- }
-
- @Override
- protected boolean monitorProcessingRate() {
- return true;
- }
-
- protected boolean logInflowOutflowRate() {
- return true;
- }
-
- @Override
- protected boolean monitorInputQueueLength() {
- return true;
- }
-
- @Override
- protected IFramePreprocessor getFramePreProcessor() {
- return null;
- }
-
- @Override
- protected IFramePostProcessor getFramePostProcessor() {
- return null;
- }
-
- @Override
- protected boolean reportOutflowRate() {
- return false;
- }
-
- @Override
- protected boolean reportInflowRate() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
deleted file mode 100644
index ccd6547..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucket.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DataBucket {
-
- private static final AtomicInteger globalBucketId = new AtomicInteger(0);
-
- private final ByteBuffer content;
- private final AtomicInteger readCount;
- private final int bucketId;
-
- private int desiredReadCount;
- private ContentType contentType;
-
- private final DataBucketPool pool;
-
- public enum ContentType {
- DATA, // data (feed tuple)
- EOD, // A signal indicating that there shall be no more data
- EOSD // End of processing of spilled data
- }
-
- public DataBucket(DataBucketPool pool) {
- this.content = ByteBuffer.allocate(pool.getFrameSize());
- this.readCount = new AtomicInteger(0);
- this.pool = pool;
- this.contentType = ContentType.DATA;
- this.bucketId = globalBucketId.incrementAndGet();
- }
-
- public synchronized void reset(ByteBuffer frame) {
- if (frame != null) {
- content.flip();
- System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
- content.limit(frame.limit());
- content.position(0);
- }
- }
-
- public synchronized void doneReading() {
- if (readCount.incrementAndGet() == desiredReadCount) {
- readCount.set(0);
- pool.returnDataBucket(this);
- }
- }
-
- public void setDesiredReadCount(int rCount) {
- this.desiredReadCount = rCount;
- }
-
- public ContentType getContentType() {
- return contentType;
- }
-
- public void setContentType(ContentType contentType) {
- this.contentType = contentType;
- }
-
- public synchronized ByteBuffer getContent() {
- return content;
- }
-
- @Override
- public String toString() {
- return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
deleted file mode 100644
index 2e7e60c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DataBucketPool.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Stack;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-/**
- * Represents a pool of reusable {@link DataBucket}
- */
-public class DataBucketPool implements IFeedMemoryComponent {
-
- /** A unique identifier for the memory component **/
- private final int componentId;
-
- /** The {@link IFeedMemoryManager} for the NodeController **/
- private final IFeedMemoryManager memoryManager;
-
- /** A collection of available data buckets {@link DataBucket} **/
- private final Stack<DataBucket> pool;
-
- /** The total number of data buckets {@link DataBucket} allocated **/
- private int totalAllocation;
-
- /** The fixed frame size as configured for the asterix runtime **/
- private final int frameSize;
-
- public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
- this.componentId = componentId;
- this.memoryManager = memoryManager;
- this.pool = new Stack<DataBucket>();
- this.frameSize = frameSize;
- expand(size);
- }
-
- public synchronized void returnDataBucket(DataBucket bucket) {
- pool.push(bucket);
- }
-
- public synchronized DataBucket getDataBucket() {
- if (pool.size() == 0) {
- if (!memoryManager.expandMemoryComponent(this)) {
- return null;
- }
- }
- return pool.pop();
- }
-
- @Override
- public Type getType() {
- return Type.POOL;
- }
-
- @Override
- public int getTotalAllocation() {
- return totalAllocation;
- }
-
- @Override
- public int getComponentId() {
- return componentId;
- }
-
- @Override
- public void expand(int delta) {
- for (int i = 0; i < delta; i++) {
- DataBucket bucket = new DataBucket(this);
- pool.add(bucket);
- }
- totalAllocation += delta;
- }
-
- @Override
- public void reset() {
- totalAllocation -= pool.size();
- pool.clear();
- }
-
- @Override
- public String toString() {
- return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
- }
-
- public int getSize() {
- return pool.size();
- }
-
- public int getFrameSize() {
- return frameSize;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
deleted file mode 100644
index d0e371e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler.Type;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * Provides mechanism for distributing the frames, as received from an operator to a
- * set of registered readers. Each reader typically operates at a different pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact the progress of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
- private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
-
- /** A unique identifier for the feed to which the incoming tuples belong. **/
- private final FeedId feedId;
-
- /** An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each operating in isolation. **/
- private final FrameDistributor frameDistributor;
-
- /** The original frame writer instantiated as part of job creation **/
- private IFrameWriter writer;
-
- /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
- private final FeedRuntimeType feedRuntimeType;
-
- /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
- private final int partition;
-
- public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
- FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
- throws IOException {
- this.feedId = feedId;
- this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
- feedManager.getFeedMemoryManager(), fta);
- this.feedRuntimeType = feedRuntimeType;
- this.partition = partition;
- this.writer = writer;
- }
-
- public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
- FeedConnectionId connectionId) throws Exception {
- FeedFrameCollector collector = null;
- if (!frameDistributor.isRegistered(frameWriter)) {
- collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
- frameDistributor.registerFrameCollector(collector);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
- }
- return collector;
- } else {
- throw new IllegalStateException("subscriber " + feedId + " already registered");
- }
- }
-
- public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
- boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
- if (!success) {
- throw new IllegalStateException(
- "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
- }
- }
-
- public void notifyEndOfFeed() {
- frameDistributor.notifyEndOfFeed();
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- frameDistributor.close();
- } finally {
- writer.close();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void nextFrame(ByteBuffer frame) throws HyracksDataException {
- frameDistributor.nextFrame(frame);
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
- return frameDistributor.getRegisteredReaders();
- }
-
- public void setWriter(IFrameWriter writer) {
- this.writer = writer;
- }
-
- public Type getType() {
- return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
- }
-
- @Override
- public String toString() {
- return feedId.toString() + feedRuntimeType + "[" + partition + "]";
- }
-
- public FrameDistributor.DistributionMode getDistributionMode() {
- return frameDistributor.getDistributionMode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
deleted file mode 100644
index 11130a1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedActivity.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.common.feeds;
-
-import java.util.Map;
-
-public class FeedActivity implements Comparable<FeedActivity> {
-
- private int activityId;
-
- private final String dataverseName;
- private final String datasetName;
- private final String feedName;
- private final Map<String, String> feedActivityDetails;
-
- public static class FeedActivityDetails {
- public static final String INTAKE_LOCATIONS = "intake-locations";
- public static final String COMPUTE_LOCATIONS = "compute-locations";
- public static final String STORAGE_LOCATIONS = "storage-locations";
- public static final String COLLECT_LOCATIONS = "collect-locations";
- public static final String FEED_POLICY_NAME = "feed-policy-name";
- public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
-
- }
-
- public FeedActivity(String dataverseName, String feedName, String datasetName,
- Map<String, String> feedActivityDetails) {
- this.dataverseName = dataverseName;
- this.feedName = feedName;
- this.datasetName = datasetName;
- this.feedActivityDetails = feedActivityDetails;
- }
-
- public String getDataverseName() {
- return dataverseName;
- }
-
- public String getDatasetName() {
- return datasetName;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof FeedActivity)) {
- return false;
- }
-
- if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
- return false;
- }
- if (!((FeedActivity) other).datasetName.equals(datasetName)) {
- return false;
- }
- if (!((FeedActivity) other).getFeedName().equals(feedName)) {
- return false;
- }
- if (((FeedActivity) other).getActivityId() != (activityId)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
- }
-
- public String getConnectTimestamp() {
- return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
- }
-
- public int getActivityId() {
- return activityId;
- }
-
- public void setActivityId(int activityId) {
- this.activityId = activityId;
- }
-
- public Map<String, String> getFeedActivityDetails() {
- return feedActivityDetails;
- }
-
- @Override
- public int compareTo(FeedActivity o) {
- return o.getActivityId() - this.activityId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
deleted file mode 100644
index 97dc4f8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
-
- private final FeedFrameCache feedFrameCache;
-
- public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
- throws IOException {
- super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
- nPartitions);
- this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
- }
-
- public void process(ByteBuffer frame) throws HyracksDataException {
- feedFrameCache.sendMessage(frame);
- super.process(frame);
- }
-
- public void replayFrom(int recordId) throws HyracksDataException {
- feedFrameCache.replayRecords(recordId);
- }
-
- public void dropTill(int recordId) {
- feedFrameCache.dropTillRecordId(recordId);
- }
-
- public void replayCached() throws HyracksDataException{
- feedFrameCache.replayAll();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
deleted file mode 100644
index 4d6a427..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectJobInfo.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedConnectJobInfo extends FeedJobInfo {
-
- private final FeedConnectionId connectionId;
- private final Map<String, String> feedPolicy;
- private final IFeedJoint sourceFeedJoint;
- private IFeedJoint computeFeedJoint;
-
- private List<String> collectLocations;
- private List<String> computeLocations;
- private List<String> storageLocations;
-
- public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
- IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
- Map<String, String> feedPolicy) {
- super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
- this.connectionId = connectionId;
- this.sourceFeedJoint = sourceFeedJoint;
- this.computeFeedJoint = computeFeedJoint;
- this.feedPolicy = feedPolicy;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public List<String> getCollectLocations() {
- return collectLocations;
- }
-
- public List<String> getComputeLocations() {
- return computeLocations;
- }
-
- public List<String> getStorageLocations() {
- return storageLocations;
- }
-
- public void setCollectLocations(List<String> collectLocations) {
- this.collectLocations = collectLocations;
- }
-
- public void setComputeLocations(List<String> computeLocations) {
- this.computeLocations = computeLocations;
- }
-
- public void setStorageLocations(List<String> storageLocations) {
- this.storageLocations = storageLocations;
- }
-
- public IFeedJoint getSourceFeedJoint() {
- return sourceFeedJoint;
- }
-
- public IFeedJoint getComputeFeedJoint() {
- return computeFeedJoint;
- }
-
- public Map<String, String> getFeedPolicy() {
- return feedPolicy;
- }
-
- public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
- this.computeFeedJoint = computeFeedJoint;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
deleted file mode 100644
index 355d340..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionId.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a dataset.
- */
-public class FeedConnectionId implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedId feedId;
- private final String datasetName;
-
- public FeedConnectionId(FeedId feedId, String datasetName) {
- this.feedId = feedId;
- this.datasetName = datasetName;
- }
-
- public FeedConnectionId(String dataverse, String feedName, String datasetName) {
- this.feedId = new FeedId(dataverse, feedName);
- this.datasetName = datasetName;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- public String getDatasetName() {
- return datasetName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof FeedConnectionId)) {
- return false;
- }
-
- if (this == o
- || (((FeedConnectionId) o).getFeedId().equals(feedId) && ((FeedConnectionId) o).getDatasetName()
- .equals(datasetName))) {
- return true;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return feedId.toString() + "-->" + datasetName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
deleted file mode 100644
index 6230eac..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConnectionRequest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-
-/**
- * A request for connecting a feed to a dataset.
- */
-public class FeedConnectionRequest {
-
- public enum ConnectionStatus {
- /** initial state upon creating a connection request **/
- INITIALIZED,
-
- /** connection establish; feed is receiving data **/
- ACTIVE,
-
- /** connection removed; feed is not receiving data **/
- INACTIVE,
-
- /** connection request failed **/
- FAILED
- }
-
- /** Feed joint on the feed pipeline that serves as the source for this subscription **/
- private final FeedJointKey feedJointKey;
-
- /** Location in the source feed pipeline from where feed tuples are received. **/
- private final ConnectionLocation connectionLocation;
-
- /** List of functions that need to be applied in sequence after the data hand-off at the source feedPointKey. **/
- private final List<String> functionsToApply;
-
- /** Status associated with the subscription. */
- private ConnectionStatus connectionStatus;
-
- /** Name of the policy that governs feed ingestion **/
- private final String policy;
-
- /** Policy associated with a feed connection **/
- private final Map<String, String> policyParameters;
-
- /** Target dataset associated with the connection request **/
- private final String targetDataset;
-
- private final FeedId receivingFeedId;
-
-
- public FeedConnectionRequest(FeedJointKey feedPointKey, ConnectionLocation connectionLocation,
- List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
- FeedId receivingFeedId) {
- this.feedJointKey = feedPointKey;
- this.connectionLocation = connectionLocation;
- this.functionsToApply = functionsToApply;
- this.targetDataset = targetDataset;
- this.policy = policy;
- this.policyParameters = policyParameters;
- this.receivingFeedId = receivingFeedId;
- this.connectionStatus = ConnectionStatus.INITIALIZED;
- }
-
- public FeedJointKey getFeedJointKey() {
- return feedJointKey;
- }
-
- public ConnectionStatus getConnectionStatus() {
- return connectionStatus;
- }
-
- public void setSubscriptionStatus(ConnectionStatus connectionStatus) {
- this.connectionStatus = connectionStatus;
- }
-
- public String getPolicy() {
- return policy;
- }
-
- public String getTargetDataset() {
- return targetDataset;
- }
-
- public ConnectionLocation getSubscriptionLocation() {
- return connectionLocation;
- }
-
- public FeedId getReceivingFeedId() {
- return receivingFeedId;
- }
-
- public Map<String, String> getPolicyParameters() {
- return policyParameters;
- }
-
- public List<String> getFunctionsToApply() {
- return functionsToApply;
- }
-
- @Override
- public String toString() {
- return "Feed Connection Request " + feedJointKey + " [" + connectionLocation + "]" + " Apply ("
- + StringUtils.join(functionsToApply, ",") + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
deleted file mode 100644
index 05e554b..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedConstants.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-public class FeedConstants {
-
- public static final class StatisticsConstants {
- public static final String INTAKE_TUPLEID = "intake-tupleid";
- public static final String INTAKE_PARTITION = "intake-partition";
- public static final String INTAKE_TIMESTAMP = "intake-timestamp";
- public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
- public static final String STORE_TIMESTAMP = "store-timestamp";
-
- }
-
- public static final class MessageConstants {
- public static final String MESSAGE_TYPE = "message-type";
- public static final String NODE_ID = "nodeId";
- public static final String DATAVERSE = "dataverse";
- public static final String FEED = "feed";
- public static final String DATASET = "dataset";
- public static final String AQL = "aql";
- public static final String RUNTIME_TYPE = "runtime-type";
- public static final String PARTITION = "partition";
- public static final String INTAKE_PARTITION = "intake-partition";
- public static final String INFLOW_RATE = "inflow-rate";
- public static final String OUTFLOW_RATE = "outflow-rate";
- public static final String MODE = "mode";
- public static final String CURRENT_CARDINALITY = "current-cardinality";
- public static final String REDUCED_CARDINALITY = "reduced-cardinality";
- public static final String VALUE_TYPE = "value-type";
- public static final String VALUE = "value";
- public static final String CPU_LOAD = "cpu-load";
- public static final String N_RUNTIMES = "n_runtimes";
- public static final String HEAP_USAGE = "heap_usage";
- public static final String OPERAND_ID = "operand-id";
- public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
- public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
- public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
- public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
- public static final String COMMIT_ACKS = "commit-acks";
- public static final String MAX_WINDOW_ACKED = "max-window-acked";
- public static final String BASE = "base";
- public static final String NOT_APPLICABLE = "N/A";
-
- }
-
- public static final class NamingConstants {
- public static final String LIBRARY_NAME_SEPARATOR = "#";
- }
-
- public static class JobConstants {
- public static final int DEFAULT_FRAME_SIZE = 8192;
- }
-}