You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:03 UTC
[06/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
deleted file mode 100644
index 6c71036..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.metadata.entitytupletranslators;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.builders.IARecordBuilder;
-import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.AUnorderedList;
-import edu.uci.ics.asterix.om.base.IACursor;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-/**
- * Translates a Dataset metadata entity to an ITupleReference and vice versa.
- */
-public class FeedActivityTupleTranslator extends AbstractTupleTranslator<FeedActivity> {
- // Field indexes of serialized FeedActivity in a tuple.
- // Key field.
- public static final int FEED_ACTIVITY_ACTIVITY_DATAVERSE_NAME_FIELD_INDEX = 0;
-
- public static final int FEED_ACTIVITY_ACTIVITY_FEED_NAME_FIELD_INDEX = 1;
-
- public static final int FEED_ACTIVITY_ACTIVITY_DATASET_NAME_FIELD_INDEX = 2;
-
- public static final int FEED_ACTIVITY_ACTIVITY_ID_FIELD_INDEX = 3;
-
- // Payload field containing serialized FeedActivity.
- public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 4;
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
- private AMutableInt32 aInt32;
- protected ISerializerDeserializer<AInt32> aInt32Serde;
-
- @SuppressWarnings("unchecked")
- public FeedActivityTupleTranslator(boolean getTuple) {
- super(getTuple, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET.getFieldCount());
- aInt32 = new AMutableInt32(-1);
- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- }
-
- @Override
- public FeedActivity getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
- byte[] serRecord = frameTuple.getFieldData(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordStartOffset = frameTuple.getFieldStart(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordLength = frameTuple.getFieldLength(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
- ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
- DataInput in = new DataInputStream(stream);
- ARecord feedActivityRecord = (ARecord) recordSerDes.deserialize(in);
- return createFeedActivityFromARecord(feedActivityRecord);
- }
-
- private FeedActivity createFeedActivityFromARecord(ARecord feedActivityRecord) {
-
- String dataverseName = ((AString) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
- String feedName = ((AString) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX)).getStringValue();
- String datasetName = ((AString) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
- int activityId = ((AInt32) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX)).getIntegerValue();
- String feedActivityType = ((AString) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX)).getStringValue();
-
- IACursor cursor = ((AUnorderedList) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX)).getCursor();
- Map<String, String> activityDetails = new HashMap<String, String>();
- String key;
- String value;
- while (cursor.next()) {
- ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
- activityDetails.put(key, value);
- }
-
- String feedActivityTimestamp = ((AString) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX))
- .getStringValue();
-
- FeedActivity fa = new FeedActivity(dataverseName, feedName, datasetName,
- FeedActivityType.valueOf(feedActivityType), activityDetails);
- fa.setLastUpdatedTimestamp(feedActivityTimestamp);
- fa.setActivityId(activityId);
- return fa;
- }
-
- @Override
- public ITupleReference getTupleFromMetadataEntity(FeedActivity feedActivity) throws IOException, MetadataException {
- // write the key in the first three fields of the tuple
- ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
-
- tupleBuilder.reset();
- aString.setValue(feedActivity.getDataverseName());
- stringSerde.serialize(aString, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- aString.setValue(feedActivity.getFeedName());
- stringSerde.serialize(aString, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- aString.setValue(feedActivity.getDatasetName());
- stringSerde.serialize(aString, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- aInt32.setValue(feedActivity.getActivityId());
- int32Serde.serialize(aInt32, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
- // write the pay-load in the 2nd field of the tuple
-
- recordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
-
- // write field 0
- fieldValue.reset();
- aString.setValue(feedActivity.getDataverseName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
-
- // write field 1
- fieldValue.reset();
- aString.setValue(feedActivity.getFeedName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX, fieldValue);
-
- // write field 2
- fieldValue.reset();
- aString.setValue(feedActivity.getDatasetName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
-
- // write field 3
- fieldValue.reset();
- aInt32.setValue(feedActivity.getActivityId());
- int32Serde.serialize(aInt32, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX, fieldValue);
-
- // write field 4
- fieldValue.reset();
- aString.setValue(feedActivity.getFeedActivityType().name());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX, fieldValue);
-
- // write field 5
- Map<String, String> properties = feedActivity.getFeedActivityDetails();
- UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX]);
- for (Map.Entry<String, String> property : properties.entrySet()) {
- String name = property.getKey();
- String value = property.getValue();
- itemValue.reset();
- writePropertyTypeRecord(name, value, itemValue.getDataOutput());
- listBuilder.addItem(itemValue);
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX, fieldValue);
-
- // write field 6
- fieldValue.reset();
- aString.setValue(Calendar.getInstance().getTime().toString());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
-
- // write record
- try {
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- } catch (AsterixException e) {
- throw new MetadataException(e);
- }
- tupleBuilder.addFieldEndOffset();
-
- tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
- return tuple;
- }
-
- public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
- IARecordBuilder propertyRecordBuilder = new RecordBuilder();
- ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
- propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_DETAILS_RECORDTYPE);
- AMutableString aString = new AMutableString("");
- ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
-
- // write field 0
- fieldValue.reset();
- aString.setValue(name);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(0, fieldValue);
-
- // write field 1
- fieldValue.reset();
- aString.setValue(value);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- propertyRecordBuilder.addField(1, fieldValue);
-
- try {
- propertyRecordBuilder.write(out, true);
- } catch (IOException | AsterixException e) {
- throw new HyracksDataException(e);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index def0938..32ed123 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -25,8 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.OrderedListBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -34,8 +34,9 @@ import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.base.ARecord;
@@ -65,14 +66,9 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE);
- private AMutableInt32 aInt32;
- protected ISerializerDeserializer<AInt32> aInt32Serde;
- @SuppressWarnings("unchecked")
public FeedTupleTranslator(boolean getTuple) {
super(getTuple, MetadataPrimaryIndexes.FEED_DATASET.getFieldCount());
- aInt32 = new AMutableInt32(-1);
- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
}
@Override
@@ -92,49 +88,64 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
.getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
String feedName = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX))
.getStringValue();
- String adapterName = ((AString) feedRecord
- .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX)).getStringValue();
-
- IACursor cursor = ((AUnorderedList) feedRecord
- .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX)).getCursor();
- String key;
- String value;
- Map<String, String> adapterConfiguration = new HashMap<String, String>();
- while (cursor.next()) {
- ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
- adapterConfiguration.put(key, value);
- }
Object o = feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX);
FunctionSignature signature = null;
if (!(o instanceof ANull)) {
- String functionIdentifier = ((AString) o).getStringValue();
- String[] qnameComponents = functionIdentifier.split("\\.");
- String functionDataverse;
- String functionName;
- if (qnameComponents.length == 2) {
- functionDataverse = qnameComponents[0];
- functionName = qnameComponents[1];
- } else {
- functionDataverse = dataverseName;
- functionName = qnameComponents[0];
+ String functionName = ((AString) o).getStringValue();
+ signature = new FunctionSignature(dataverseName, functionName, 1);
+ }
+
+ String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX))
+ .getStringValue();
+
+ FeedType feedTypeEnum = FeedType.valueOf(feedType.toUpperCase());
+ switch (feedTypeEnum) {
+ case PRIMARY: {
+ ARecord feedTypeDetailsRecord = (ARecord) feedRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX);
+ String adapterName = ((AString) feedTypeDetailsRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX))
+ .getStringValue();
+
+ IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
+ .getCursor();
+ String key;
+ String value;
+ Map<String, String> adaptorConfiguration = new HashMap<String, String>();
+ while (cursor.next()) {
+ ARecord field = (ARecord) cursor.get();
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+ .getStringValue();
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+ .getStringValue();
+ adaptorConfiguration.put(key, value);
+ }
+ feed = new PrimaryFeed(dataverseName, feedName, adapterName, adaptorConfiguration, signature);
+
}
+ break;
+ case SECONDARY: {
+ ARecord feedTypeDetailsRecord = (ARecord) feedRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX);
+
+ String sourceFeedName = ((AString) feedTypeDetailsRecord
+ .getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX))
+ .getStringValue();
+
+ feed = new SecondaryFeed(dataverseName, feedName, sourceFeedName, signature);
- String[] nameComponents = functionName.split("@");
- signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
+ }
+ break;
}
- feed = new Feed(dataverseName, feedName, adapterName, adapterConfiguration, signature);
return feed;
}
@Override
public ITupleReference getTupleFromMetadataEntity(Feed feed) throws IOException, MetadataException {
- // write the key in the first three fields of the tuple
- ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
-
+ // write the key in the first two fields of the tuple
tupleBuilder.reset();
aString.setValue(feed.getDataverseName());
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
@@ -160,35 +171,23 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
// write field 2
fieldValue.reset();
- aString.setValue(feed.getAdapterName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX, fieldValue);
-
- // write field 3 (adapterConfiguration)
- Map<String, String> adapterConfiguration = feed.getAdapterConfiguration();
- UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.FEED_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX]);
- for (Map.Entry<String, String> property : adapterConfiguration.entrySet()) {
- String name = property.getKey();
- String value = property.getValue();
- itemValue.reset();
- writePropertyTypeRecord(name, value, itemValue.getDataOutput());
- listBuilder.addItem(itemValue);
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX, fieldValue);
-
- // write field 4
- fieldValue.reset();
if (feed.getAppliedFunction() != null) {
- aString.setValue(feed.getAppliedFunction().toString());
+ aString.setValue(feed.getAppliedFunction().getName());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
}
- // write field 5
+ // write field 3
+ fieldValue.reset();
+ aString.setValue(feed.getFeedType().name().toUpperCase());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX, fieldValue);
+
+ // write field 4/5
+ fieldValue.reset();
+ writeFeedTypeDetailsRecordType(recordBuilder, feed, fieldValue);
+
+ // write field 6
fieldValue.reset();
aString.setValue(Calendar.getInstance().getTime().toString());
stringSerde.serialize(aString, fieldValue.getDataOutput());
@@ -206,6 +205,85 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
return tuple;
}
+ @SuppressWarnings("unchecked")
+ private void writeFeedTypeDetailsRecordType(IARecordBuilder recordBuilder, Feed feed,
+ ArrayBackedValueStorage fieldValue) throws HyracksDataException {
+
+ switch (feed.getFeedType()) {
+ case PRIMARY: {
+ PrimaryFeed primaryFeed = (PrimaryFeed) feed;
+
+ IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder();
+ OrderedListBuilder listBuilder = new OrderedListBuilder();
+ ArrayBackedValueStorage primaryRecordfieldValue = new ArrayBackedValueStorage();
+ ArrayBackedValueStorage primaryRecordItemValue = new ArrayBackedValueStorage();
+ primaryDetailsRecordBuilder.reset(MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE);
+
+ AMutableString aString = new AMutableString("");
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(primaryFeed.getAdaptorName());
+ stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput());
+ primaryDetailsRecordBuilder.addField(
+ MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX,
+ primaryRecordfieldValue);
+
+ // write field 1
+ listBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
+ for (Map.Entry<String, String> property : primaryFeed.getAdaptorConfiguration().entrySet()) {
+ String name = property.getKey();
+ String value = property.getValue();
+ primaryRecordItemValue.reset();
+ writePropertyTypeRecord(name, value, primaryRecordItemValue.getDataOutput());
+ listBuilder.addItem(primaryRecordItemValue);
+ }
+ primaryRecordfieldValue.reset();
+ listBuilder.write(primaryRecordfieldValue.getDataOutput(), true);
+ primaryDetailsRecordBuilder.addField(
+ MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX,
+ primaryRecordfieldValue);
+
+ try {
+ primaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+
+ recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
+ }
+ break;
+
+ case SECONDARY:
+ SecondaryFeed secondaryFeed = (SecondaryFeed) feed;
+
+ IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder();
+ ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage();
+ secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(secondaryFeed.getSourceFeedName());
+ stringSerde.serialize(aString, secondaryFieldValue.getDataOutput());
+ secondaryDetailsRecordBuilder.addField(
+ MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX,
+ secondaryFieldValue);
+
+ try {
+ secondaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
+ break;
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
IARecordBuilder propertyRecordBuilder = new RecordBuilder();
ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
new file mode 100644
index 0000000..7fc2e7e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.external;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
+ * Acts as a marker interface indicating that the implementation provides functionality
+ * for creating an adapter.
+ */
+public interface IAdapterFactory extends Serializable {
+
+ public static final String KEY_TYPE_NAME = "type-name";
+
+ public enum SupportedOperation {
+ READ,
+ WRITE,
+ READ_WRITE
+ }
+
+ /**
+ * Returns the type of adapter indicating if the adapter can be used for
+ * reading from an external data source or writing to an external data
+ * source or can be used for both purposes.
+ *
+ * @see SupportedOperation
+ * @return
+ */
+ public SupportedOperation getSupportedOperations();
+
+ /**
+ * Returns the display name corresponding to the Adapter type that is created by the factory.
+ *
+ * @return the display name
+ */
+ public String getName();
+
+ /**
+ * Gets a list of partition constraints. A partition constraint can be a
+ * requirement to execute at a particular location or could be cardinality
+ * constraints indicating the number of instances that need to run in
+ * parallel. example, a IDatasourceAdapter implementation written for data
+ * residing on the local file system of a node cannot run on any other node
+ * and thus has a location partition constraint. The location partition
+ * constraint can be expressed as a node IP address or a node controller id.
+ * In the former case, the IP address is translated to a node controller id
+ * running on the node with the given IP address.
+ */
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+
+ /**
+ * Creates an instance of IDatasourceAdapter.
+ *
+ * @param HyracksTaskContext
+ * @param partition
+ * @return An instance of IDatasourceAdapter.
+ * @throws Exception
+ */
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
+
+ /**
+ * @param configuration
+ * @param outputType
+ * @throws Exception
+ */
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
+
+ /**
+ * Gets the record type associated with the output of the adapter
+ *
+ * @return
+ */
+ public ARecordType getAdapterOutputType();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
index 9e8e5f7..3bc402b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,23 +29,11 @@ public abstract class AbstractDatasourceAdapter implements IDatasourceAdapter {
private static final long serialVersionUID = 1L;
+ public static final String KEY_PARSER_FACTORY = "parser";
+
protected Map<String, Object> configuration;
protected transient AlgebricksPartitionConstraint partitionConstraint;
protected IAType atype;
protected IHyracksTaskContext ctx;
- protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
-
- public static final String KEY_FORMAT = "format";
- public static final String KEY_PARSER_FACTORY = "parser";
- public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
- public static final String FORMAT_ADM = "adm";
-
- private static Map<String, Object> initializeFormatParserFactoryMap() {
- Map<String, Object> map = new HashMap<String, Object>();
- map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
- map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
- return map;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
new file mode 100644
index 0000000..86ca550
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+
+
+public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ protected FeedPolicyEnforcer policyEnforcer;
+
+ public FeedPolicyEnforcer getPolicyEnforcer() {
+ return policyEnforcer;
+ }
+
+ public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+ this.policyEnforcer = policyEnforcer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
new file mode 100644
index 0000000..6bd726c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager.State;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+
+public class AdapterExecutor implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
+
+ private final DistributeFeedFrameWriter writer;
+
+ private final IFeedAdapter adapter;
+
+ private final IAdapterRuntimeManager adapterManager;
+
+ public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
+ IAdapterRuntimeManager adapterManager) {
+ this.writer = writer;
+ this.adapter = adapter;
+ this.adapterManager = adapterManager;
+ }
+
+ @Override
+ public void run() {
+ int partition = adapterManager.getPartition();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting ingestion for partition:" + partition);
+ }
+ boolean continueIngestion = true;
+ boolean failedIngestion = false;
+ while (continueIngestion) {
+ try {
+ adapter.start(partition, writer);
+ continueIngestion = false;
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception during feed ingestion " + e.getMessage());
+ e.printStackTrace();
+ }
+ continueIngestion = adapter.handleException(e);
+ failedIngestion = !continueIngestion;
+ }
+ }
+
+ adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
+ synchronized (adapterManager) {
+ adapterManager.notifyAll();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
index 897faae..b032bab 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
@@ -17,40 +17,46 @@ package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
/**
- * A unique identifier for a datasource adapter.
+ * A unique identifier for a data source adapter.
*/
public class AdapterIdentifier implements Serializable {
private static final long serialVersionUID = 1L;
private final String namespace;
- private final String adapterName;
+ private final String name;
- public AdapterIdentifier(String namespace, String adapterName) {
+ public AdapterIdentifier(String namespace, String name) {
this.namespace = namespace;
- this.adapterName = adapterName;
+ this.name = name;
}
public String getNamespace() {
return namespace;
}
- public String getAdapterName() {
- return adapterName;
+ public String getName() {
+ return name;
}
@Override
public int hashCode() {
- return (namespace + "@" + adapterName).hashCode();
+ return (namespace + "@" + name).hashCode();
}
@Override
public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (this == o) {
+ return true;
+ }
if (!(o instanceof AdapterIdentifier)) {
return false;
}
return namespace.equals(((AdapterIdentifier) o).getNamespace())
- && namespace.equals(((AdapterIdentifier) o).getNamespace());
+ && name.equals(((AdapterIdentifier) o).getName());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index b9a5e73..de288ba 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -15,68 +15,51 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.FeedFrameWriter.Mode;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
-public class AdapterRuntimeManager implements IAdapterExecutor {
+public class AdapterRuntimeManager implements IAdapterRuntimeManager {
private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
- private final FeedConnectionId feedId;
+ private final FeedId feedId;
- private IFeedAdapter feedAdapter;
+ private final IFeedAdapter feedAdapter;
- private AdapterExecutor adapterExecutor;
+ private final IIntakeProgressTracker tracker;
- private State state;
+ private final AdapterExecutor adapterExecutor;
+
+ private final int partition;
- private int partition;
+ private final ExecutorService executorService;
private IngestionRuntime ingestionRuntime;
- private final IFeedManager feedManager;
-
- public enum State {
- /*
- * Indicates that data from external source will be pushed downstream for storage
- */
- ACTIVE_INGESTION,
- /*
- * Indicates that data from external source would be buffered and not pushed downstream
- */
- INACTIVE_INGESTION,
- /*
- * Indicates that feed ingestion activity has finished
- */
- FINISHED_INGESTION
- }
+ private State state;
- public AdapterRuntimeManager(FeedConnectionId feedId, IFeedAdapter feedAdapter, FeedFrameWriter writer,
- int partition, LinkedBlockingQueue<IFeedMessage> inbox, IFeedManager feedManager) {
+ public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
+ DistributeFeedFrameWriter writer, int partition) {
this.feedId = feedId;
this.feedAdapter = feedAdapter;
+ this.tracker = tracker;
this.partition = partition;
- this.feedManager = feedManager;
this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.state = State.INACTIVE_INGESTION;
}
@Override
public void start() throws Exception {
state = State.ACTIVE_INGESTION;
- ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
- feedManager.registerFeedRuntime(ingestionRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
- }
- ExecutorService executorService = feedManager.getFeedExecutorService(feedId);
executorService.execute(adapterExecutor);
}
@@ -84,19 +67,18 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
public void stop() {
try {
feedAdapter.stop();
- state = State.FINISHED_INGESTION;
- synchronized (this) {
- notifyAll();
- }
} catch (Exception exception) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Unable to stop adapter " + feedAdapter + ", encountered exception " + exception);
}
+ } finally {
+ state = State.FINISHED_INGESTION;
+ executorService.shutdown();
}
}
@Override
- public FeedConnectionId getFeedId() {
+ public FeedId getFeedId() {
return feedId;
}
@@ -109,84 +91,15 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
return feedAdapter;
}
- public void setFeedAdapter(IFeedAdapter feedAdapter) {
- this.feedAdapter = feedAdapter;
- }
-
- public static class AdapterExecutor implements Runnable {
-
- private FeedFrameWriter writer;
-
- private IFeedAdapter adapter;
-
- private AdapterRuntimeManager runtimeManager;
-
- public AdapterExecutor(int partition, FeedFrameWriter writer, IFeedAdapter adapter,
- AdapterRuntimeManager adapterRuntimeMgr) {
- this.writer = writer;
- this.adapter = adapter;
- this.runtimeManager = adapterRuntimeMgr;
- }
-
- @Override
- public void run() {
- try {
- int partition = runtimeManager.getPartition();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting ingestion for partition:" + partition);
- }
- adapter.start(partition, writer);
- runtimeManager.setState(State.FINISHED_INGESTION);
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Exception during feed ingestion " + e.getMessage());
- }
- } finally {
- synchronized (runtimeManager) {
- runtimeManager.notifyAll();
- }
- }
- }
-
- public FeedFrameWriter getWriter() {
- return writer;
- }
-
- public void setWriter(IFrameWriter writer) {
- if (this.writer != null) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switching writer to:" + writer + " from " + this.writer);
- }
- this.writer.setWriter(writer);
- }
- }
-
+ public IIntakeProgressTracker getTracker() {
+ return tracker;
}
public synchronized State getState() {
return state;
}
- @SuppressWarnings("incomplete-switch")
- public synchronized void setState(State state) throws HyracksDataException {
- if (this.state.equals(state)) {
- return;
- }
- switch (state) {
- case INACTIVE_INGESTION:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Set " + Mode.STORE + " mode");
- }
- adapterExecutor.getWriter().setMode(Mode.STORE);
- break;
- case ACTIVE_INGESTION:
- adapterExecutor.getWriter().setMode(Mode.FORWARD);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Set " + Mode.FORWARD + " mode");
- }
- break;
- }
+ public synchronized void setState(State state) {
this.state = state;
}
@@ -194,6 +107,7 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
return adapterExecutor;
}
+ @Override
public int getPartition() {
return partition;
}
@@ -202,4 +116,9 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
return ingestionRuntime;
}
+ @Override
+ public IIntakeProgressTracker getProgressTracker() {
+ return tracker;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 3bd73a3..3d29aeb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -17,6 +17,7 @@ package edu.uci.ics.asterix.metadata.feeds;
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
@@ -26,16 +27,22 @@ public class BuiltinFeedPolicies {
public static final FeedPolicy BASIC = initializeBasicPolicy();
- public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+ public static final FeedPolicy BASIC_FT = initializeBasicFTPolicy();
- public static final FeedPolicy FAULT_TOLERANT_BASIC_MONITORED = initializeFaultTolerantBasicMonitoredPolicy();
+ public static final FeedPolicy ADVANCED_FT = initializeAdvancedFTPolicy();
- public static final FeedPolicy ELASTIC = initializeFaultTolerantBasicMonitoredElasticPolicy();
+ public static final FeedPolicy ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
- public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
- FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
+ public static final FeedPolicy ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
- public static final FeedPolicy DEFAULT_POLICY = BASIC;
+ public static final FeedPolicy ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
+
+ public static final FeedPolicy ELASTIC = initializeAdvancedFTElasticPolicy();
+
+ public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
+ ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, ELASTIC };
+
+ public static final FeedPolicy DEFAULT_POLICY = BASIC_FT;
public static final String CONFIG_FEED_POLICY_KEY = "policy";
@@ -48,79 +55,134 @@ public class BuiltinFeedPolicies {
return null;
}
- // BMFE
- private static FeedPolicy initializeFaultTolerantBasicMonitoredElasticPolicy() {
+ //Brittle
+ private static FeedPolicy initializeBrittlePolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
- policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
- policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
- String description = "Basic Monitored Fault-Tolerant Elastic";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMFE", description, policyParams);
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+
+ String description = "Brittle";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
}
- //BMF
- private static FeedPolicy initializeFaultTolerantBasicMonitoredPolicy() {
+ //Basic
+ private static FeedPolicy initializeBasicPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "Basic Monitored Fault-Tolerant";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMF", description, policyParams);
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+
+ String description = "Basic";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
}
- //BM
- private static FeedPolicy initializeBasicMonitoredPolicy() {
+ // BasicFT
+ private static FeedPolicy initializeBasicFTPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "false");
+ policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "1");
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+ policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
+
String description = "Basic Monitored Fault-Tolerant";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BM", description, policyParams);
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
}
- //B
- private static FeedPolicy initializeBasicPolicy() {
+ // AdvancedFT
+ private static FeedPolicy initializeAdvancedFTPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "Basic";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "B", description, policyParams);
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
+ policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
+
+ String description = "Basic Monitored Fault-Tolerant with at least once semantics";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
}
- //Br
- private static FeedPolicy initializeBrittlePolicy() {
+ // AdvancedFT_Discard
+ private static FeedPolicy initializeAdvancedFTDiscardPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "false");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
- policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
- policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "Brittle";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Br", description, policyParams);
+ policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "false");
+ policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
+
+ String description = "AdvancedFT 100% Discard during congestion";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
+ policyParams);
+ }
+
+ // AdvancedFT_Spill
+ private static FeedPolicy initializeAdvancedFTSpillPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.TRUE);
+ policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "" + FeedPolicyAccessor.NO_LIMIT);
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
+
+ String description = "AdvancedFT 100% Discard during congestion";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
+ }
+
+ // AdvancedFT_Spill
+ private static FeedPolicy initializeAdvancedFTThrottlePolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.FALSE);
+ policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "" + 0);
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
+
+ String description = "AdvancedFT Throttle during congestion";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
+ policyParams);
+ }
+
+ // AdvancedFT_Elastic
+ private static FeedPolicy initializeAdvancedFTElasticPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
+ policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+ policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
+
+ String description = "Basic Monitored Fault-Tolerant Elastic";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
+ policyParams);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
new file mode 100644
index 0000000..398de35
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
+
+ private final FeedConnectionId connectionId;
+ private IFrameWriter downstreamWriter;
+ private final FrameTupleAccessor inputFrameTupleAccessor;
+ private final FrameTupleAppender tupleAppender;
+ private final IFrame frame;
+
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+
+ public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
+ ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
+ throws HyracksDataException {
+ this.downstreamWriter = downstreamWriter;
+ RecordDescriptor inputRecordDescriptor = sourceRuntime.getRecordDescriptor();
+ inputFrameTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
+ tupleAppender = new FrameTupleAppender();
+ frame = new VSizeFrame(ctx);
+ tupleAppender.reset(frame, true);
+ this.connectionId = connectionId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ downstreamWriter.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputFrameTupleAccessor.reset(buffer);
+ int nTuple = inputFrameTupleAccessor.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
+ appendTupleToFrame();
+ tupleBuilder.reset();
+ }
+ }
+
+ private void appendTupleToFrame() throws HyracksDataException {
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
+ tupleAppender.reset(frame, true);
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ downstreamWriter.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ downstreamWriter.close();
+ }
+
+ @Override
+ public FeedId getFeedId() {
+ return connectionId.getFeedId();
+ }
+
+ @Override
+ public Type getType() {
+ return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
+ }
+
+ public IFrameWriter getDownstreamWriter() {
+ return downstreamWriter;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public void reset(IFrameWriter writer) {
+ this.downstreamWriter = writer;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
deleted file mode 100644
index b71be24..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
-import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class ConditionalPushTupleParserFactory implements ITupleParserFactory {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
- IDataParser dataParser = null;
- switch (parserType) {
- case ADM:
- dataParser = new ADMDataParser();
- break;
- case DELIMITED_DATA:
- dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
- break;
- }
- return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
- }
-
- private final ARecordType recordType;
- private final Map<String, String> configuration;
- private IValueParserFactory[] valueParserFactories;
- private char delimiter;
- private char quote;
- private boolean hasHeader;
- private final ParserType parserType;
-
- public enum ParserType {
- ADM,
- DELIMITED_DATA
- }
-
- public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, char quote, boolean hasHeader, Map<String, String> configuration) {
- this.recordType = recordType;
- this.valueParserFactories = valueParserFactories;
- this.delimiter = fieldDelimiter;
- this.quote = quote;
- this.hasHeader = hasHeader;
- this.configuration = configuration;
- this.parserType = ParserType.DELIMITED_DATA;
- }
-
- public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
- this.recordType = recordType;
- this.configuration = configuration;
- this.parserType = ParserType.ADM;
- }
-}
-
-class ConditionalPushTupleParser extends AbstractTupleParser {
-
- private final IDataParser dataParser;
- private int batchSize;
- private long batchInterval;
- private boolean continueIngestion = true;
- private int tuplesInFrame = 0;
- private TimeBasedFlushTask flushTask;
- private Timer timer = new Timer();
- private Object lock = new Object();
- private boolean activeTimer = false;
-
- public static final String BATCH_SIZE = "batch-size";
- public static final String BATCH_INTERVAL = "batch-interval";
-
- public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, String> configuration) throws HyracksDataException {
- super(ctx, recType);
- this.dataParser = dataParser;
- String propValue = (String) configuration.get(BATCH_SIZE);
- batchSize = propValue != null ? Integer.parseInt(propValue) : Integer.MAX_VALUE;
- propValue = (String) configuration.get(BATCH_INTERVAL);
- batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
- activeTimer = batchInterval > 0;
- }
-
- public void stop() {
- continueIngestion = false;
- }
-
- @Override
- public IDataParser getDataParser() {
- return dataParser;
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- flushTask = new TimeBasedFlushTask(writer, lock);
- IDataParser parser = getDataParser();
- try {
- parser.initialize(in, recType, true);
- if (activeTimer) {
- timer.schedule(flushTask, 0, batchInterval);
- }
- while (continueIngestion) {
- tb.reset();
- if (!parser.parse(tb.getDataOutput())) {
- break;
- }
- tb.addFieldEndOffset();
- addTuple(writer);
- }
- if (appender.getTupleCount() > 0) {
- if (activeTimer) {
- synchronized (lock) {
- appender.flush(writer, true);
- }
- } else {
- appender.flush(writer, true);
- }
- }
- } catch (AsterixException ae) {
- throw new HyracksDataException(ae);
- } catch (IOException ioe) {
- throw new HyracksDataException(ioe);
- } finally {
- if (activeTimer) {
- timer.cancel();
- }
- }
- }
-
- protected void addTuple(IFrameWriter writer) throws HyracksDataException {
- if (activeTimer) {
- synchronized (lock) {
- addTupleToFrame(writer);
- }
- } else {
- addTupleToFrame(writer);
- }
- }
-
- protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- if (tuplesInFrame == batchSize) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Batch size exceeded! flushing frame " + "(" + tuplesInFrame + ")");
- }
- }
- tuplesInFrame = 0;
- }
- tuplesInFrame++;
- }
-
- private class TimeBasedFlushTask extends TimerTask {
-
- private IFrameWriter writer;
- private final Object lock;
-
- public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
- this.writer = writer;
- this.lock = lock;
- }
-
- @Override
- public void run() {
- try {
- if (tuplesInFrame > 0) {
- synchronized (lock) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
- }
- appender.flush(writer, true);
- tuplesInFrame = 0;
- }
- }
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index 06de1e0..536c702 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
new file mode 100644
index 0000000..b67b813
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
+ */
+public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
+
+ /** The type associated with the ADM data output from the feed adaptor */
+ private final IAType outputType;
+
+ /** unique identifier for a feed instance. */
+ private final FeedConnectionId connectionId;
+
+ /** Map representation of policy parameters */
+ private final Map<String, String> feedPolicyProperties;
+
+ /** The (singleton) instance of {@code IFeedIngestionManager} **/
+ private IFeedSubscriptionManager subscriptionManager;
+
+ /** The source feed from which the feed derives its data from. **/
+ private final FeedId sourceFeedId;
+
+ /** The subscription location at which the recipient feed receives tuples from the source feed **/
+ private final ConnectionLocation subscriptionLocation;
+
+ public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
+ ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+ ConnectionLocation subscriptionLocation) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.outputType = atype;
+ this.connectionId = feedConnectionId;
+ this.feedPolicyProperties = feedPolicyProperties;
+ this.sourceFeedId = sourceFeedId;
+ this.subscriptionLocation = subscriptionLocation;
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.subscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
+ ISubscribableRuntime sourceRuntime = null;
+ IOperatorNodePushable nodePushable = null;
+ switch (subscriptionLocation) {
+ case SOURCE_FEED_INTAKE_STAGE:
+ try {
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.INTAKE, partition);
+ sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
+ if (sourceRuntime == null) {
+ throw new HyracksDataException("Source intake task not found for source feed id "
+ + sourceFeedId);
+ }
+ nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+ feedPolicyProperties, partition, nPartitions, sourceRuntime);
+
+ } catch (Exception exception) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
+ }
+ throw new HyracksDataException("Initialization of the feed adapter failed", exception);
+ }
+ break;
+ case SOURCE_FEED_COMPUTE_STAGE:
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.COMPUTE, partition);
+ sourceRuntime = (ISubscribableRuntime) subscriptionManager
+ .getSubscribableRuntime(feedSubscribableRuntimeId);
+ if (sourceRuntime == null) {
+ throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
+ + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
+ }
+ nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+ feedPolicyProperties, partition, nPartitions, sourceRuntime);
+ break;
+ }
+ return nodePushable;
+ }
+
+ public FeedConnectionId getFeedConnectionId() {
+ return connectionId;
+ }
+
+ public Map<String, String> getFeedPolicyProperties() {
+ return feedPolicyProperties;
+ }
+
+ public IAType getOutputType() {
+ return outputType;
+ }
+
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptors[0];
+ }
+
+ public FeedId getSourceFeedId() {
+ return sourceFeedId;
+ }
+
+ private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
+ int waitCycleCount = 0;
+ ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+ while (ingestionRuntime == null && waitCycleCount < 10) {
+ try {
+ Thread.sleep(2000);
+ waitCycleCount++;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ break;
+ }
+ ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+ }
+ return (IngestionRuntime) ingestionRuntime;
+ }
+
+ public ConnectionLocation getSubscriptionLocation() {
+ return subscriptionLocation;
+ }
+}