You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:05 UTC
[08/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
new file mode 100644
index 0000000..bd1d75c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import twitter4j.FilterQuery;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class TwitterUtil {
+
+ public static class ConfigurationConstants {
+ public static final String KEY_LOCATION = "location";
+ public static final String LOCATION_US = "US";
+ }
+
+ public static class GeoConstants {
+ public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
+ public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+ }
+
+ private static Map<String, double[][]> initializeBoundingBoxes() {
+ Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
+ boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
+ { -66.885444, 49.384358 } });
+ return boundingBoxes;
+ }
+
+ public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+ String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+ double[][] locations = null;
+ if (locationValue != null) {
+ if (locationValue.contains(",")) {
+ String[] coordinatesString = locationValue.trim().split(",");
+ locations = new double[2][2];
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ try {
+ locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
+ } catch (NumberFormatException ne) {
+ throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+ }
+ }
+ }
+ } else {
+ locations = GeoConstants.boundingBoxes.get(locationValue);
+ }
+ if (locations != null) {
+ FilterQuery filterQuery = new FilterQuery();
+ filterQuery.locations(locations);
+ return filterQuery;
+ }
+ }
+ return null;
+
+ }
+
+ public static Twitter getTwitterService(Map<String, String> configuration) {
+ ConfigurationBuilder cb = getAuthConfiguration(configuration);
+ TwitterFactory tf = new TwitterFactory(cb.build());
+ Twitter twitter = tf.getInstance();
+ return twitter;
+ }
+
+ public static TwitterStream getTwitterStream(Map<String, String> configuration) {
+ ConfigurationBuilder cb = getAuthConfiguration(configuration);
+ TwitterStreamFactory factory = new TwitterStreamFactory(cb.build());
+ return factory.getInstance();
+ }
+
+ private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true);
+ String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String oAuthAccessTokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+
+ cb.setOAuthConsumerKey(oAuthConsumerKey);
+ cb.setOAuthConsumerSecret(oAuthConsumerSecret);
+ cb.setOAuthAccessToken(oAuthAccessToken);
+ cb.setOAuthAccessTokenSecret(oAuthAccessTokenSecret);
+ return cb;
+ }
+
+ public static void initializeConfigurationWithAuthInfo(Map<String, String> configuration) throws AsterixException {
+ String authMode = configuration.get(AuthenticationConstants.AUTHENTICATION_MODE);
+ if (authMode == null) {
+ authMode = AuthenticationConstants.AUTHENTICATION_MODE_FILE;
+ }
+ try {
+ switch (authMode) {
+ case AuthenticationConstants.AUTHENTICATION_MODE_FILE:
+ Properties prop = new Properties();
+ String authFile = configuration.get(AuthenticationConstants.OAUTH_AUTHENTICATION_FILE);
+ if (authFile == null) {
+ authFile = AuthenticationConstants.DEFAULT_AUTH_FILE;
+ }
+ InputStream in = TwitterUtil.class.getResourceAsStream(authFile);
+ prop.load(in);
+ in.close();
+ configuration.put(AuthenticationConstants.OAUTH_CONSUMER_KEY,
+ prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_KEY));
+ configuration.put(AuthenticationConstants.OAUTH_CONSUMER_SECRET,
+ prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_SECRET));
+ configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN,
+ prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN));
+ configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET,
+ prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET));
+ break;
+ case AuthenticationConstants.AUTHENTICATION_MODE_EXPLICIT:
+ break;
+ }
+ } catch (Exception e) {
+ throw new AsterixException("Incorrect configuration! unable to load authentication credentials "
+ + e.getMessage());
+ }
+ }
+
+ public static final class AuthenticationConstants {
+ public static final String OAUTH_CONSUMER_KEY = "consumer.key";
+ public static final String OAUTH_CONSUMER_SECRET = "consumer.secret";
+ public static final String OAUTH_ACCESS_TOKEN = "access.token";
+ public static final String OAUTH_ACCESS_TOKEN_SECRET = "access.token.secret";
+ public static final String OAUTH_AUTHENTICATION_FILE = "authentication.file";
+ public static final String AUTHENTICATION_MODE = "authentication.mode";
+ public static final String AUTHENTICATION_MODE_FILE = "file";
+ public static final String AUTHENTICATION_MODE_EXPLICIT = "explicit";
+ public static final String DEFAULT_AUTH_FILE = "/feed/twitter/auth.properties"; // default authentication file
+ }
+
+ public static final class SearchAPIConstants {
+ public static final String QUERY = "query";
+ public static final String INTERVAL = "interval";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
new file mode 100644
index 0000000..90fddcd
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
+import edu.uci.ics.asterix.external.library.IFunctionFactory;
+
+public class AddHashTagsFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsFunction();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
new file mode 100644
index 0000000..93a87f5
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+ private JPoint location = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ location = new JPoint(0, 0);
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+ JDouble latitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+ JDouble longitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+ if (latitude != null && longitude != null) {
+ location.setValue(latitude.getValue(), longitude.getValue());
+ }
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+
+ JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+ outputRecord.setField(Datatypes.Tweet.ID, inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+ JRecord userRecord = (JRecord) inputRecord.getValueByName(Datatypes.Tweet.USER);
+ outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+ userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+ outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+ outputRecord.setField(Datatypes.Tweet.CREATED_AT, inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+ outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+ outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(outputRecord);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
new file mode 100644
index 0000000..a2bec69
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class AddHashTagsInPlaceFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsInPlaceFunction();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
new file mode 100644
index 0000000..a3f6f70
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -0,0 +1,54 @@
+/* 1
+ * Copyright 2009-2013 by The Regents of the University of California 2
+ * Licensed under the Apache License, Version 2.0 (the "License"); 3
+ * you may not use this file except in compliance with the License. 4
+ * you may obtain a copy of the License from 5
+ * 6
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsInPlaceFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(inputRecord);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
index 58995c2..ec04541 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
@@ -53,9 +53,6 @@ public class UpperCaseFunction implements IExternalScalarFunction {
JRecord result = (JRecord) functionHelper.getResultObject();
result.setField("id", id);
result.setField("text", text);
- JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
- newField.setValue(text.getValue().substring(random.nextInt(text.getValue().length())));
- result.addField("substring", newField);
functionHelper.setResult(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
index 07f1a40..7a2597e 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
@@ -23,8 +23,9 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter.DataExchangeMode;
import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,8 +44,8 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
private DummyGenerator generator;
public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
- Map<String, String> configuration) throws IOException {
- super(parserFactory, sourceDatatype, ctx);
+ Map<String, String> configuration, int partition) throws IOException {
+ super(parserFactory, sourceDatatype, ctx, partition);
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
this.configuration = configuration;
@@ -131,4 +132,9 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
generator.stop();
}
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index b042e9c..5416ce2 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -17,19 +17,23 @@ package edu.uci.ics.asterix.external.library.adapter;
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+public class TestTypedAdapterFactory implements IFeedAdapterFactory {
/**
*
@@ -38,7 +42,7 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
public static final String NAME = "test_typed_adapter";
- private static ARecordType adapterOutputType = initOutputType();
+ private ARecordType outputType;
public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
@@ -50,7 +54,7 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
}
private static ARecordType initOutputType() {
- String[] fieldNames = new String[] { "tweetid", "message-text" };
+ String[] fieldNames = new String[] { "id", "message-text" };
IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
ARecordType outputType = null;
try {
@@ -67,29 +71,36 @@ public class TestTypedAdapterFactory implements ITypedAdapterFactory {
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return new AlgebricksCountPartitionConstraint(1);
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
- return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
+ ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
+ InputDataFormat.ADM);
+ return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
}
@Override
public ARecordType getAdapterOutputType() {
- return adapterOutputType;
+ return outputType;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
+ this.outputType = outputType;
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/resources/library_descriptor.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/resources/library_descriptor.xml b/asterix-external-data/src/test/resources/library_descriptor.xml
new file mode 100644
index 0000000..e35288f
--- /dev/null
+++ b/asterix-external-data/src/test/resources/library_descriptor.xml
@@ -0,0 +1,76 @@
+<externalLibrary xmlns="library">
+ <language>JAVA</language>
+ <libraryFunctions>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>parseTweet</name>
+ <arguments>TweetInputType</arguments>
+ <return_type>TweetOutputType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.ParseTweetFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addHashTags</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addHashTagsInPlace</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>mysum</name>
+ <arguments>AINT32,AINT32</arguments>
+ <return_type>AINT32</return_type>
+ <definition>edu.uci.ics.asterix.external.library.SumFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>getCapital</name>
+ <arguments>ASTRING</arguments>
+ <return_type>CountryCapitalType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.CapitalFinderFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>toUpper</name>
+ <arguments>TextType</arguments>
+ <return_type>TextType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.UpperCaseFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>allTypes</name>
+ <arguments>AllType</arguments>
+ <return_type>AllType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AllTypesFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>echoDelay</name>
+ <arguments>TweetMessageType</arguments>
+ <return_type>TweetMessageType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
+ </definition>
+ </libraryFunction>
+ </libraryFunctions>
+ <libraryAdapters>
+ <libraryAdapter>
+ <name>test_typed_adapter</name>
+ <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory
+ </factory_class>
+ </libraryAdapter>
+ </libraryAdapters>
+</externalLibrary>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/test/resources/text_functions.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
deleted file mode 100644
index 8c7a92c..0000000
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<externalLibrary xmlns="library">
- <language>JAVA</language>
- <libraryFunctions>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>parseTweet</name>
- <arguments>TweetInputType</arguments>
- <return_type>TweetOutputType</return_type>
- <definition>edu.uci.ics.asterix.external.library.ParseTweetFactory
- </definition>
- </libraryFunction>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>mysum</name>
- <arguments>AINT32,AINT32</arguments>
- <return_type>AINT32</return_type>
- <definition>edu.uci.ics.asterix.external.library.SumFactory
- </definition>
- </libraryFunction>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>getCapital</name>
- <arguments>ASTRING</arguments>
- <return_type>CountryCapitalType</return_type>
- <definition>edu.uci.ics.asterix.external.library.CapitalFinderFactory
- </definition>
- </libraryFunction>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>toUpper</name>
- <arguments>TextType</arguments>
- <return_type>TextType</return_type>
- <definition>edu.uci.ics.asterix.external.library.UpperCaseFactory
- </definition>
- </libraryFunction>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>allTypes</name>
- <arguments>AllType</arguments>
- <return_type>AllType</return_type>
- <definition>edu.uci.ics.asterix.external.library.AllTypesFactory
- </definition>
- </libraryFunction>
- <libraryFunction>
- <function_type>SCALAR</function_type>
- <name>echoDelay</name>
- <arguments>TweetMessageType</arguments>
- <return_type>TweetMessageType</return_type>
- <definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
- </definition>
- </libraryFunction>
- </libraryFunctions>
- <libraryAdapters>
- <libraryAdapter>
- <name>test_typed_adapter</name>
- <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory</factory_class>
- </libraryAdapter>
- </libraryAdapters>
-</externalLibrary>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
deleted file mode 100644
index c532de5..0000000
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixFaultToleranceIT.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.installer.test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runners.Parameterized.Parameters;
-
-import edu.uci.ics.asterix.test.aql.TestsUtils;
-import edu.uci.ics.asterix.testframework.context.TestCaseContext;
-
-public class AsterixFaultToleranceIT {
-
- private static final String PATH_BASE = "src/test/resources/integrationts/fault-tolerance";
- private static final String PATH_ACTUAL = "ittest/";
- private static final Logger LOGGER = Logger.getLogger(AsterixFaultToleranceIT.class.getName());
- private static List<TestCaseContext> testCaseCollection;
-
- @BeforeClass
- public static void setUp() throws Exception {
- AsterixInstallerIntegrationUtil.init();
- TestCaseContext.Builder b = new TestCaseContext.Builder();
- testCaseCollection = b.build(new File(PATH_BASE));
- File outdir = new File(PATH_ACTUAL);
- outdir.mkdirs();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- AsterixInstallerIntegrationUtil.deinit();
- File outdir = new File(PATH_ACTUAL);
- File[] files = outdir.listFiles();
- if (files == null || files.length == 0) {
- outdir.delete();
- }
- }
-
- @Parameters
- public static Collection<Object[]> tests() throws Exception {
- Collection<Object[]> testArgs = new ArrayList<Object[]>();
- return testArgs;
- }
-
- @Test
- public void test() throws Exception {
- for (TestCaseContext testCaseCtx : testCaseCollection) {
- TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
- }
- }
-
- public static void main(String[] args) throws Exception {
- try {
- setUp();
- new AsterixFaultToleranceIT().test();
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.info("TEST CASE(S) FAILED");
- } finally {
- tearDown();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
deleted file mode 100644
index 1f0678e..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
+++ /dev/null
@@ -1,28 +0,0 @@
-drop dataverse feeds if exists;
-create dataverse feeds;
-use dataverse feeds;
-
-create type TwitterUserType as closed {
- screen-name: string,
- lang: string,
- friends_count: int32,
- statuses_count: int32,
- name: string,
- followers_count: int32
-}
-
-create type TweetMessageType as closed {
- tweetid: int64,
- user: TwitterUserType,
- sender-location: point,
- send-time: datetime,
- referred-topics: {{ string }},
- message-text: string
-}
-
-create dataset Tweets(TweetMessageType)
-primary key tweetid;
-
-create feed TwitterFirehose
-using twitter_firehose
-(("duration"="30"),("tps"="50"),("tput-duration"="5"),("mode"="controlled"));
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
deleted file mode 100644
index 64dbf25..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
+++ /dev/null
@@ -1,3 +0,0 @@
-use dataverse feeds;
-
-connect feed TwitterFirehose to dataset Tweets;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
deleted file mode 100644
index 5caff40..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
+++ /dev/null
@@ -1 +0,0 @@
-10000
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
deleted file mode 100644
index 2d8a23e..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
+++ /dev/null
@@ -1 +0,0 @@
-stop -n asterix
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
deleted file mode 100644
index 4e99f33..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
+++ /dev/null
@@ -1 +0,0 @@
-start -n asterix
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
deleted file mode 100644
index c5da56a..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
+++ /dev/null
@@ -1 +0,0 @@
-40000
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
deleted file mode 100644
index d03b9fe..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
+++ /dev/null
@@ -1,10 +0,0 @@
-use dataverse feeds;
-
-let $numTuples:=count(for $x in dataset Tweets
-return $x)
-let $result:=if($numTuples > 225)
-then
- 1
-else
- 0
-return $result
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm b/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
deleted file mode 100644
index 3c13d5f..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.1.adm
+++ /dev/null
@@ -1,2 +0,0 @@
-[ 1
- ]
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm b/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
deleted file mode 100644
index 3c13d5f..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/results/feeds/IN1-cluster-restart/IN1-cluster-restart.2.adm
+++ /dev/null
@@ -1,2 +0,0 @@
-[ 1
- ]
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml b/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
deleted file mode 100644
index 0d9ed23..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-<test-suite xmlns="urn:xml.testframework.asterix.ics.uci.edu" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
- <test-group name="fault-tolerance">
- <test-case FilePath="feeds">
- <compilation-unit name="IN1-cluster-restart">
- <output-dir compare="Text">IN1-cluster-restart</output-dir>
- </compilation-unit>
- </test-case>
- </test-group>
-</test-suite>
-
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
index b3bf441..a9bcf09 100644
--- a/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-metadata/functionDataset/functionDataset.1.adm
@@ -1,7 +1,9 @@
-[ { "DataverseName": "externallibtest", "Name": "testlib#allTypes", "Arity": "1", "Params": [ "AllType" ], "ReturnType": "AllType", "Definition": "edu.uci.ics.asterix.external.library.AllTypesFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#echoDelay", "Arity": "1", "Params": [ "TweetMessageType" ], "ReturnType": "TweetMessageType", "Definition": "edu.uci.ics.asterix.external.library.EchoDelayFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#getCapital", "Arity": "1", "Params": [ "ASTRING" ], "ReturnType": "CountryCapitalType", "Definition": "edu.uci.ics.asterix.external.library.CapitalFinderFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
-, { "DataverseName": "externallibtest", "Name": "testlib#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory\n\t\t\t", "Language": "JAVA", "Kind": "SCALAR" }
+[ { "DataverseName": "externallibtest", "Name": "testlib#addHashTags", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#addHashTagsInPlace", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#allTypes", "Arity": "1", "Params": [ "AllType" ], "ReturnType": "AllType", "Definition": "edu.uci.ics.asterix.external.library.AllTypesFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#echoDelay", "Arity": "1", "Params": [ "TweetMessageType" ], "ReturnType": "TweetMessageType", "Definition": "edu.uci.ics.asterix.external.library.EchoDelayFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#getCapital", "Arity": "1", "Params": [ "ASTRING" ], "ReturnType": "CountryCapitalType", "Definition": "edu.uci.ics.asterix.external.library.CapitalFinderFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory", "Language": "JAVA", "Kind": "SCALAR" }
]
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index ef21a16..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -39,7 +39,7 @@
</test-case>
</test-group>
<test-group name="library-feeds">
- <test-case FilePath="library-feeds" category="slow">
+ <test-case FilePath="library-feeds">
<compilation-unit name="feed_ingest">
<output-dir compare="Text">feed_ingest</output-dir>
</compilation-unit>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index 6eb60f0..12e50dc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -23,7 +23,6 @@ import java.util.Map;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
@@ -32,7 +31,6 @@ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -64,9 +62,7 @@ public class MetadataCache {
protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
// Key is adapter dataverse. Key of value map is the adapter name
protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
- // Key is FeedId
- protected final Map<FeedConnectionId, FeedActivity> feedActivity = new HashMap<FeedConnectionId, FeedActivity>();
-
+
// Key is DataverseName, Key of the value map is the Policy name
protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
// Key is library dataverse. Key of value map is the library name
@@ -110,7 +106,6 @@ public class MetadataCache {
synchronized (datatypes) {
synchronized (functions) {
synchronized (adapters) {
- synchronized (feedActivity) {
synchronized (libraries) {
synchronized (compactionPolicies) {
dataverses.clear();
@@ -120,7 +115,6 @@ public class MetadataCache {
datatypes.clear();
functions.clear();
adapters.clear();
- feedActivity.clear();
libraries.clear();
compactionPolicies.clear();
}
@@ -133,7 +127,7 @@ public class MetadataCache {
}
}
}
- }
+
public Object addDataverseIfNotExists(Dataverse dataverse) {
synchronized (dataverses) {
@@ -240,7 +234,6 @@ public class MetadataCache {
synchronized (functions) {
synchronized (adapters) {
synchronized (libraries) {
- synchronized (feedActivity) {
synchronized (feeds) {
synchronized (compactionPolicies) {
datasets.remove(dataverse.getDataverseName());
@@ -257,19 +250,8 @@ public class MetadataCache {
for (FunctionSignature signature : markedFunctionsForRemoval) {
functions.remove(signature);
}
- List<FeedConnectionId> feedActivitiesMarkedForRemoval = new ArrayList<FeedConnectionId>();
- for (FeedConnectionId fid : feedActivity.keySet()) {
- if (fid.getDataverse().equals(dataverse.getDataverseName())) {
- feedActivitiesMarkedForRemoval.add(fid);
- }
- }
- for (FeedConnectionId fid : feedActivitiesMarkedForRemoval) {
- feedActivity.remove(fid);
- }
-
libraries.remove(dataverse.getDataverseName());
feeds.remove(dataverse.getDataverseName());
-
return dataverses.remove(dataverse.getDataverseName());
}
}
@@ -281,7 +263,7 @@ public class MetadataCache {
}
}
}
- }
+
public Object dropDataset(Dataset dataset) {
synchronized (datasets) {
@@ -503,9 +485,9 @@ public class MetadataCache {
adaptersInDataverse = new HashMap<String, DatasourceAdapter>();
adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse);
}
- DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getAdapterName());
+ DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getName());
if (adapterObject == null) {
- return adaptersInDataverse.put(adapter.getAdapterIdentifier().getAdapterName(), adapter);
+ return adaptersInDataverse.put(adapter.getAdapterIdentifier().getName(), adapter);
}
return null;
}
@@ -516,28 +498,14 @@ public class MetadataCache {
Map<String, DatasourceAdapter> adaptersInDataverse = adapters.get(adapter.getAdapterIdentifier()
.getNamespace());
if (adaptersInDataverse != null) {
- return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getAdapterName());
+ return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
}
return null;
}
}
- public Object addFeedActivityIfNotExists(FeedActivity fa) {
- synchronized (feedActivity) {
- FeedConnectionId fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
- if (!feedActivity.containsKey(fid)) {
- feedActivity.put(fid, fa);
- }
- }
- return null;
- }
+
- public Object dropFeedActivity(FeedActivity fa) {
- synchronized (feedActivity) {
- FeedConnectionId fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
- return feedActivity.remove(fid);
- }
- }
public Object addLibraryIfNotExists(Library library) {
synchronized (libraries) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index dbfe32c..4842934 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
@@ -37,8 +36,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -717,29 +714,7 @@ public class MetadataManager implements IMetadataManager {
}
return adapter;
}
-
- @Override
- public void registerFeedActivity(MetadataTransactionContext ctx, FeedConnectionId feedId, FeedActivity feedActivity)
- throws MetadataException {
- try {
- metadataNode.registerFeedActivity(ctx.getJobId(), feedId, feedActivity);
- } catch (RemoteException e) {
- throw new MetadataException(e);
- }
- }
-
- @Override
- public FeedActivity getRecentActivityOnFeedConnection(MetadataTransactionContext ctx, FeedConnectionId feedId,
- FeedActivityType... feedActivityTypes) throws MetadataException {
-
- FeedActivity feedActivity = null;
- try {
- feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), feedId, feedActivityTypes);
- } catch (RemoteException e) {
- throw new MetadataException(e);
- }
- return feedActivity;
- }
+
@Override
public void dropLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName)
@@ -822,18 +797,7 @@ public class MetadataManager implements IMetadataManager {
}
return FeedPolicy;
}
-
- @Override
- public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx, String dataverse, String dataset)
- throws MetadataException {
- List<FeedActivity> feedActivities = null;
- try {
- feedActivities = metadataNode.getActiveFeeds(ctx.getJobId(), dataverse, dataset);
- } catch (RemoteException e) {
- throw new MetadataException(e);
- }
- return feedActivities;
- }
+
@Override
public Feed getFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
@@ -848,12 +812,14 @@ public class MetadataManager implements IMetadataManager {
@Override
public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
+ Feed feed = null;
try {
+ feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
- ctx.dropFeed(dataverse, feedName);
+ ctx.dropFeed(feed);
}
@Override
@@ -866,28 +832,41 @@ public class MetadataManager implements IMetadataManager {
ctx.addFeed(feed);
}
- @Override
- public List<FeedActivity> getConnectFeedActivitiesForFeed(MetadataTransactionContext ctx, String dataverse,
- String feedName) throws MetadataException {
- List<FeedActivity> feedActivities = null;
+
+ public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
+ throws MetadataException {
+ List<DatasourceAdapter> dataverseAdapters;
try {
- feedActivities = metadataNode.getDatasetsServedByFeed(ctx.getJobId(), dataverse, feedName);
+ dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
} catch (RemoteException e) {
throw new MetadataException(e);
}
- return feedActivities;
+ return dataverseAdapters;
}
-
- public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
+
+ public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverseName, String policyName)
throws MetadataException {
- List<DatasourceAdapter> dataverseAdapters;
+ FeedPolicy feedPolicy = null;
try {
- dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
+ feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
+ metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
- return dataverseAdapters;
+ mdTxnCtx.dropFeedPolicy(feedPolicy);
}
+
+ public List<FeedPolicy> getDataversePolicies(MetadataTransactionContext mdTxnCtx, String dataverse)
+ throws MetadataException {
+ List<FeedPolicy> dataverseFeedPolicies;
+ try {
+ dataverseFeedPolicies = metadataNode.getDataversePolicies(mdTxnCtx.getJobId(), dataverse);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ return dataverseFeedPolicies;
+ }
+
@Override
public List<ExternalFile> getDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index a593348..0ff5455 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,19 +17,13 @@ package edu.uci.ics.asterix.metadata;
import java.rmi.RemoteException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.DatasetId;
@@ -51,8 +45,6 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -66,7 +58,6 @@ import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupl
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
-import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
@@ -74,7 +65,6 @@ import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.LibraryTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator;
-import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory;
import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
@@ -374,7 +364,7 @@ public class MetadataNode implements IMetadataNode {
if (dataverseAdapters != null && dataverseAdapters.size() > 0) {
// Drop all functions in this dataverse.
for (DatasourceAdapter adapter : dataverseAdapters) {
- dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getAdapterName());
+ dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getName());
}
}
@@ -388,6 +378,14 @@ public class MetadataNode implements IMetadataNode {
dropFeed(jobId, dataverseName, feed.getFeedName());
}
}
+
+ List<FeedPolicy> feedPolicies = getDataversePolicies(jobId, dataverseName);
+ if (feedPolicies != null && feedPolicies.size() > 0) {
+ // Drop all feed ingestion policies in this dataverse.
+ for (FeedPolicy feedPolicy : feedPolicies) {
+ dropFeedPolicy(jobId, dataverseName, feedPolicy.getPolicyName());
+ }
+ }
// Delete the dataverse entry from the 'dataverse' dataset.
ITupleReference searchKey = createTuple(dataverseName);
@@ -1152,7 +1150,7 @@ public class MetadataNode implements IMetadataNode {
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple);
} catch (TreeIndexDuplicateKeyException e) {
- throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getAdapterName()
+ throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getName()
+ " already exists in dataverse '" + adapter.getAdapterIdentifier().getNamespace() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
@@ -1333,76 +1331,7 @@ public class MetadataNode implements IMetadataNode {
return DatasetIdFactory.getMostRecentDatasetId();
}
- @Override
- public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity)
- throws MetadataException, RemoteException {
- try {
- if (!FeedActivityIdFactory.isInitialized()) {
- initializeFeedActivityIdFactory(jobId);
- }
- feedActivity.setActivityId(FeedActivityIdFactory.generateFeedActivityId());
- FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
- ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(feedActivity);
- insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, tuple);
- } catch (Exception e) {
- throw new MetadataException(e);
- }
-
- }
-
- @Override
- public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivityType... activityType)
- throws MetadataException, RemoteException {
- try {
- ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getFeedName(),
- feedId.getDatasetName());
- FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
- List<FeedActivity> results = new ArrayList<FeedActivity>();
- IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
- tupleReaderWriter);
- searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
- if (!results.isEmpty()) {
- Collections.sort(results);
- if (activityType == null) {
- return results.get(0);
- } else {
- for (FeedActivity result : results) {
- for (FeedActivityType ft : activityType) {
- if (result.getActivityType().equals(ft)) {
- return result;
- }
- }
- }
- }
- }
- return null;
- } catch (Exception e) {
- throw new MetadataException(e);
- }
- }
-
- @Override
- public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException {
- try {
- ITupleReference searchKey = createTuple();
- FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
- List<FeedActivity> results = new ArrayList<FeedActivity>();
- IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
- tupleReaderWriter);
- searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
- int maxActivityId = 0;
- for (FeedActivity fa : results) {
- if (maxActivityId < fa.getActivityId()) {
- maxActivityId = fa.getActivityId();
- }
- }
- FeedActivityIdFactory.initialize(maxActivityId);
- } catch (Exception e) {
- throw new MetadataException(e);
- }
-
- }
-
+
@Override
public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
try {
@@ -1439,57 +1368,7 @@ public class MetadataNode implements IMetadataNode {
}
}
- @Override
- public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
- RemoteException {
- List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
- Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
- boolean invalidArgs = (dataverse == null && dataset != null);
- if (invalidArgs) {
- throw new MetadataException("Invalid arguments " + dataverse + " " + dataset);
- }
- try {
- ITupleReference searchKey = createTuple();
- FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
- List<FeedActivity> results = new ArrayList<FeedActivity>();
- IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
- tupleReaderWriter);
- searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
- Collections.sort(results); // recent activity first
- FeedConnectionId fid = null;
- Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>();
- for (FeedActivity fa : results) {
- if (dataverse != null) {
- if (dataset != null
- && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
- continue;
- }
- }
-
- fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
- switch (fa.getActivityType()) {
- case FEED_BEGIN:
- if (!terminatedFeeds.contains(fid)) {
- if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) {
- aFeeds.put(fid, fa);
- }
- }
- break;
- case FEED_END:
- terminatedFeeds.add(fid);
- break;
- default: //ignore
- }
- }
- for (FeedActivity f : aFeeds.values()) {
- System.out.println("ACTIVE FEEDS " + f.getFeedName());
- activeFeeds.add(f);
- }
- return activeFeeds;
- } catch (Exception e) {
- throw new MetadataException(e);
- }
- }
+
@Override
public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
@@ -1543,46 +1422,37 @@ public class MetadataNode implements IMetadataNode {
}
- public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverse, String feedName)
- throws MetadataException, RemoteException {
- List<FeedActivity> feedActivities = new ArrayList<FeedActivity>();
+
+ @Override
+ public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
+ RemoteException {
try {
- ITupleReference searchKey = createTuple(dataverse, feedName);
- FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
- List<FeedActivity> results = new ArrayList<FeedActivity>();
- IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
- tupleReaderWriter);
- searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
-
- if (!results.isEmpty()) {
- Collections.sort(results); // most recent feed activity
- Set<String> terminatedDatasets = new HashSet<String>();
- Set<String> activeDatasets = new HashSet<String>();
-
- for (FeedActivity result : results) {
- switch (result.getFeedActivityType()) {
- case FEED_BEGIN:
- if (!terminatedDatasets.contains(result.getDatasetName())) {
- feedActivities.add(result);
- activeDatasets.add(result.getDatasetName());
- }
- break;
- case FEED_END:
- if (!activeDatasets.contains(result.getDatasetName())) {
- terminatedDatasets.add(result.getDatasetName());
- }
- break;
- }
-
- }
- }
- return feedActivities;
+ ITupleReference searchKey = createTuple(dataverseName, policyName);
+ ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey);
+ deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, tuple);
+ } catch (TreeIndexException e) {
+ throw new MetadataException("Unknown feed policy " + policyName, e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
+ public List<FeedPolicy> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
+ RemoteException {
+ try {
+ ITupleReference searchKey = createTuple(dataverse);
+ FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false);
+ IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter);
+ List<FeedPolicy> results = new ArrayList<FeedPolicy>();
+ searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results);
+ return results;
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
try {
// Insert into the 'externalFiles' dataset.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index 826ebbf..24962b5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -26,11 +26,14 @@ import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Library;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
/**
@@ -228,8 +231,16 @@ public class MetadataTransactionContext extends MetadataCache {
}
- public void dropFeed(String dataverse, String feedName) {
- Feed feed = new Feed(dataverse, feedName, null, null, null);
+ public void dropFeed(String dataverseName, String feedName, FeedType feedType) {
+ Feed feed = null;
+ switch (feedType) {
+ case PRIMARY:
+ feed = new PrimaryFeed(dataverseName, feedName, null, null, null);
+ break;
+ case SECONDARY:
+ feed = new SecondaryFeed(dataverseName, feedName, null, null);
+ break;
+ }
droppedCache.addFeedIfNotExists(feed);
logAndApply(new MetadataLogicalOperation(feed, false));
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
deleted file mode 100644
index 049a45c..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package edu.uci.ics.asterix.metadata.api;
-
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Set;
-
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
-
-public interface IClusterEventsSubscriber {
-
- /**
- * @param deadNodeIds
- * @return
- */
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
-
- /**
- * @param joinedNodeId
- * @return
- */
- public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
-
- /**
- * @param response
- */
- public void notifyRequestCompletion(IClusterManagementWorkResponse response);
-
- /**
- * @param previousState
- * @param newState
- */
- public void notifyStateChange(State previousState, State newState);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
deleted file mode 100644
index 65ac354..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.api;
-
-public interface IClusterManagementWork {
-
- public enum WorkType {
- ADD_NODE,
- REMOVE_NODE
- }
-
- public WorkType getClusterManagementWorkType();
-
- public int getWorkId();
-
- public IClusterEventsSubscriber getSourceSubscriber();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
index ea07a62..0351468 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
@@ -2,6 +2,7 @@ package edu.uci.ics.asterix.metadata.api;
import java.util.Set;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.schema.cluster.Node;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 0b3bae6..6c098fb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -19,7 +19,6 @@ import java.rmi.RemoteException;
import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -30,8 +29,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -489,26 +486,8 @@ public interface IMetadataManager {
*/
public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException;
- /**
- * @param ctx
- * @param feedId
- * @param feedActivity
- * @throws MetadataException
- */
- public void registerFeedActivity(MetadataTransactionContext ctx, FeedConnectionId feedId, FeedActivity feedActivity)
- throws MetadataException;
-
- /**
- * @param ctx
- * @param dataverseName
- * @param datasetName
- * @return
- * @throws MetadataException
- */
- public FeedActivity getRecentActivityOnFeedConnection(MetadataTransactionContext ctx, FeedConnectionId feedId,
- FeedActivityType... activityTypeFilter) throws MetadataException;
-
- /**
+
+ /**
* @param ctx
* @param policy
* @throws MetadataException
@@ -525,19 +504,7 @@ public interface IMetadataManager {
public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException;
- /**
- * @param ctx
- * @param dataverse
- * @param dataset
- * @return
- * @throws MetadataException
- */
- public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx, String dataverse, String dataset)
- throws MetadataException;
-
- public List<FeedActivity> getConnectFeedActivitiesForFeed(MetadataTransactionContext ctx, String dataverse,
- String dataset) throws MetadataException;
-
+
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
public int getMostRecentDatasetId() throws MetadataException;