You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:04 UTC
[06/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
index 2ccc91c..5202093 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.library;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class SumFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
index d2c9e1b..d81f01b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionHelper;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JObjects.JInt;
public class SumFunction implements IExternalScalarFunction {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
index 0d738da..f74ed38 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.library;
-import org.apache.asterix.external.library.IExternalFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
public class UpperCaseFactory implements IFunctionFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
index 56121b0..70bd3e1 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
@@ -23,6 +23,8 @@ import java.util.Random;
import org.apache.asterix.external.library.java.JObjects.JInt;
import org.apache.asterix.external.library.java.JObjects.JRecord;
import org.apache.asterix.external.library.java.JObjects.JString;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
import org.apache.asterix.external.library.java.JTypeTag;
/**
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 39f8271..df0fb94 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -27,14 +27,13 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+public class TestTypedAdapter extends StreamBasedAdapter {
private static final long serialVersionUID = 1L;
@@ -126,17 +125,13 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
}
@Override
- public DataExchangeMode getDataExchangeMode() {
- return DataExchangeMode.PUSH;
- }
-
- @Override
- public void stop() throws Exception {
+ public boolean stop() throws Exception {
generator.stop();
+ return true;
}
@Override
- public boolean handleException(Exception e) {
+ public boolean handleException(Throwable e) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index c177a58..6b08f3a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -18,26 +18,30 @@
*/
package org.apache.asterix.external.library.adapter;
+import java.io.InputStream;
import java.util.Map;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapterFactory implements IFeedAdapterFactory {
+public class TestTypedAdapterFactory implements IAdapterFactory {
private static final long serialVersionUID = 1L;
- public static final String NAME = "test_typed_adapter";
-
private ARecordType outputType;
public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
@@ -45,13 +49,8 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
private Map<String, String> configuration;
@Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- @Override
- public String getName() {
- return NAME;
+ public String getAlias() {
+ return "test_typed";
}
@Override
@@ -60,9 +59,47 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
- InputDataFormat.ADM);
+ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ ITupleParserFactory tupleParserFactory = new ITupleParserFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleParser createTupleParser(final IHyracksCommonContext ctx) throws HyracksDataException {
+ ADMDataParser parser;
+ ITupleForwarder forwarder;
+ ArrayTupleBuilder tb;
+ try {
+ parser = new ADMDataParser();
+ forwarder = DataflowUtils.getTupleForwarder(configuration);
+ forwarder.configure(configuration);
+ tb = new ArrayTupleBuilder(1);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ return new ITupleParser() {
+
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ try {
+ parser.configure(configuration, outputType);
+ parser.setInputStream(in);
+ forwarder.initialize(ctx, writer);
+ while (true) {
+ tb.reset();
+ if (!parser.parse(tb.getDataOutput())) {
+ break;
+ }
+ tb.addFieldEndOffset();
+ forwarder.addTuple(tb);
+ }
+ forwarder.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+ };
return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
}
@@ -77,14 +114,4 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
this.outputType = outputType;
}
- @Override
- public boolean isRecordTrackingEnabled() {
- return false;
- }
-
- @Override
- public IIntakeProgressTracker createIntakeProgressTracker() {
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java b/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
new file mode 100644
index 0000000..698e414
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operator.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.asterix.om.base.AMutableInterval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.extensions.PA;
+
+public class ADMDataParserTest {
+
+ @Test
+ public void test() {
+ String[] dateIntervals = { "-9537-08-04, 9656-06-03", "-9537-04-04, 9656-06-04", "-9537-10-04, 9626-09-05" };
+ AMutableInterval[] parsedDateIntervals = new AMutableInterval[] {
+ new AMutableInterval(-4202630, 2807408, (byte) 17), new AMutableInterval(-4202752, 2807409, (byte) 17),
+ new AMutableInterval(-4202569, 2796544, (byte) 17), };
+
+ String[] timeIntervals = { "12:04:45.689Z, 12:41:59.002Z", "12:10:45.169Z, 15:37:48.736Z",
+ "04:16:42.321Z, 12:22:56.816Z" };
+ AMutableInterval[] parsedTimeIntervals = new AMutableInterval[] {
+ new AMutableInterval(43485689, 45719002, (byte) 18),
+ new AMutableInterval(43845169, 56268736, (byte) 18),
+ new AMutableInterval(15402321, 44576816, (byte) 18), };
+
+ String[] dateTimeIntervals = { "-2640-10-11T17:32:15.675Z, 4104-02-01T05:59:11.902Z",
+ "0534-12-08T08:20:31.487Z, 6778-02-16T22:40:21.653Z",
+ "2129-12-12T13:18:35.758Z, 8647-07-01T13:10:19.691Z" };
+ AMutableInterval[] parsedDateTimeIntervals = new AMutableInterval[] {
+ new AMutableInterval(-145452954464325L, 67345192751902L, (byte) 16),
+ new AMutableInterval(-45286270768513L, 151729886421653L, (byte) 16),
+ new AMutableInterval(5047449515758L, 210721439419691L, (byte) 16) };
+
+ Thread[] threads = new Thread[16];
+ AtomicInteger errorCount = new AtomicInteger(0);
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i] = new Thread(new Runnable() {
+ ADMDataParser parser = new ADMDataParser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+
+ @Override
+ public void run() {
+ try {
+ int round = 0;
+ while (round++ < 10000) {
+ // Test parseDateInterval.
+ for (int index = 0; index < dateIntervals.length; ++index) {
+ PA.invokeMethod(parser, "parseDateInterval(java.lang.String, java.io.DataOutput)",
+ dateIntervals[index], dos);
+ AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+ Assert.assertTrue(aInterval.equals(parsedDateIntervals[index]));
+ }
+
+ // Tests parseTimeInterval.
+ for (int index = 0; index < timeIntervals.length; ++index) {
+ PA.invokeMethod(parser, "parseTimeInterval(java.lang.String, java.io.DataOutput)",
+ timeIntervals[index], dos);
+ AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+ Assert.assertTrue(aInterval.equals(parsedTimeIntervals[index]));
+ }
+
+ // Tests parseDateTimeInterval.
+ for (int index = 0; index < dateTimeIntervals.length; ++index) {
+ PA.invokeMethod(parser, "parseDateTimeInterval(java.lang.String, java.io.DataOutput)",
+ dateTimeIntervals[index], dos);
+ AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+ Assert.assertTrue(aInterval.equals(parsedDateTimeIntervals[index]));
+ }
+ }
+ } catch (Exception e) {
+ errorCount.incrementAndGet();
+ e.printStackTrace();
+ }
+ }
+ });
+ // Kicks off test threads.
+ threads[i].start();
+ }
+
+ // Joins all the threads.
+ try {
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ // Asserts no failure.
+ Assert.assertTrue(errorCount.get() == 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
index 0613498..42827b4 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
@@ -20,7 +20,7 @@ import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.IdentitiyResolverFactory;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.test.runtime.HDFSCluster;
@@ -43,8 +43,8 @@ public abstract class AbstractExecutionIT {
protected static final Logger LOGGER = Logger.getLogger(AbstractExecutionIT.class.getName());
protected static final String PATH_ACTUAL = "ittest" + File.separator;
- protected static final String PATH_BASE = StringUtils.join(new String[] { "..", "asterix-app", "src", "test",
- "resources", "runtimets" }, File.separator);
+ protected static final String PATH_BASE = StringUtils
+ .join(new String[] { "..", "asterix-app", "src", "test", "resources", "runtimets" }, File.separator);
protected static final String HDFS_BASE = "../asterix-app/";
@@ -63,21 +63,21 @@ public abstract class AbstractExecutionIT {
//This is nasty but there is no very nice way to set a system property on each NC that I can figure.
//The main issue is that we need the NC resolver to be the IdentityResolver and not the DNSResolver.
- FileUtils.copyFile(
- new File(StringUtils.join(new String[] { "src", "test", "resources", "integrationts", "asterix-configuration.xml" }, File.separator)),
+ FileUtils
+ .copyFile(
+ new File(StringUtils.join(new String[] { "src", "test", "resources", "integrationts",
+ "asterix-configuration.xml" }, File.separator)),
new File(AsterixInstallerIntegrationUtil.getManagixHome() + "/conf/asterix-configuration.xml"));
AsterixLifecycleIT.setUp();
-
FileUtils.copyDirectoryStructure(
new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)),
new File(AsterixInstallerIntegrationUtil.getManagixHome() + "/clusters/local/working_dir/data"));
-
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
- System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+ System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
IdentitiyResolverFactory.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
index 438bb05..1da01c3 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -43,16 +43,21 @@ public class AsterixExternalLibraryIT {
@BeforeClass
public static void setUp() throws Exception {
- AsterixInstallerIntegrationUtil.init();
- File asterixInstallerProjectDir = new File(System.getProperty("user.dir"));
- String asterixExternalLibraryPath = asterixInstallerProjectDir.getParentFile().getAbsolutePath()
- + File.separator + LIBRARY_PATH;
- LOGGER.info("Installing library :" + LIBRARY_NAME + " located at " + asterixExternalLibraryPath
- + " in dataverse " + LIBRARY_DATAVERSE);
- AsterixInstallerIntegrationUtil.installLibrary(LIBRARY_NAME, LIBRARY_DATAVERSE, asterixExternalLibraryPath);
- AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
- TestCaseContext.Builder b = new TestCaseContext.Builder();
- testCaseCollection = b.build(new File(PATH_BASE));
+ try {
+ AsterixInstallerIntegrationUtil.init();
+ File asterixInstallerProjectDir = new File(System.getProperty("user.dir"));
+ String asterixExternalLibraryPath = asterixInstallerProjectDir.getParentFile().getAbsolutePath()
+ + File.separator + LIBRARY_PATH;
+ LOGGER.info("Installing library :" + LIBRARY_NAME + " located at " + asterixExternalLibraryPath
+ + " in dataverse " + LIBRARY_DATAVERSE);
+ AsterixInstallerIntegrationUtil.installLibrary(LIBRARY_NAME, LIBRARY_DATAVERSE, asterixExternalLibraryPath);
+ AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ testCaseCollection = b.build(new File(PATH_BASE));
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index c000d55..34a8733 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -93,6 +93,7 @@ public class AsterixInstallerIntegrationUtil {
String command = "shutdown";
cmdHandler.processCommand(command.split(" "));
+ //TODO: This must be fixed, an arbitrary wait for 2s is not a reliable way to make sure the process have completed successfully.
Thread.sleep(2000);
// start zookeeper
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
index 93e9f6d..cf69e1a 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
@@ -18,10 +18,12 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.logging.Level;
-import java.util.logging.Logger;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.test.runtime.HDFSCluster;
+import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.plexus.util.FileUtils;
import org.junit.AfterClass;
@@ -31,20 +33,16 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.testframework.context.TestCaseContext;
-
/**
* Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
*/
@RunWith(Parameterized.class)
-public class ClusterExecutionIT extends AbstractExecutionIT{
+public class ClusterExecutionIT extends AbstractExecutionIT {
private static final String CLUSTER_CC_ADDRESS = "10.10.0.2";
private static final int CLUSTER_CC_API_PORT = 19002;
- private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS,CLUSTER_CC_API_PORT);
+ private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
@BeforeClass
public static void setUp() throws Exception {
@@ -60,13 +58,14 @@ public class ClusterExecutionIT extends AbstractExecutionIT{
AsterixClusterLifeCycleIT.setUp();
FileUtils.copyDirectoryStructure(
- new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)), new File(
- StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "managix-working", "data" },
+ new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)),
+ new File(StringUtils.join(
+ new String[] { "src", "test", "resources", "clusterts", "managix-working", "data" },
File.separator)));
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
- System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+ System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
IdentitiyResolverFactory.class.getName());
}
@@ -100,6 +99,7 @@ public class ClusterExecutionIT extends AbstractExecutionIT{
this.tcCtx = tcCtx;
}
+ @Override
@Test
public void test() throws Exception {
testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
index 17184c7..492f173 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
@@ -14,25 +14,9 @@
*/
package org.apache.asterix.installer.test;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.asterix.test.runtime.HDFSCluster;
-import org.codehaus.plexus.util.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
/**
* Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
@@ -40,7 +24,6 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class ManagixExecutionIT extends AbstractExecutionIT {
-
private TestCaseContext tcCtx;
public ManagixExecutionIT(TestCaseContext tcCtx) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
index 2e66afd..b9c2072 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
@@ -17,19 +17,8 @@ package org.apache.asterix.installer.test;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.test.aql.TestExecutor;
-import org.apache.asterix.test.runtime.HDFSCluster;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.lang3.StringUtils;
-import org.codehaus.plexus.util.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -38,7 +27,7 @@ import org.junit.runners.Parameterized.Parameters;
* Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
*/
@RunWith(Parameterized.class)
-public class ManagixSqlppExecutionIT extends ManagixExecutionIT{
+public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
@Parameters
public static Collection<Object[]> tests() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index a62abaa..71c762a 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.feeds.FeedConnectionRequest;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
@@ -188,7 +188,7 @@ public class SubscribeFeedStatement implements Statement {
try {
switch (feed.getFeedType()) {
case PRIMARY:
- Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
mdTxnCtx);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
----------------------------------------------------------------------
diff --git a/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java b/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
index 3e21653..a113864 100644
--- a/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
+++ b/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
@@ -72,11 +72,34 @@ public class [LEXER_NAME] {
// ================================================================================
// Public interface
// ================================================================================
-
+
public [LEXER_NAME](java.io.Reader stream) throws IOException{
reInit(stream);
}
+ public [LEXER_NAME]() throws IOException{
+ reInit();
+ }
+
+ public void setBuffer(char[] buffer){
+ this.buffer = buffer;
+ tokenBegin = bufpos = 0;
+ containsEscapes = false;
+ line++;
+ tokenBegin = -1;
+ }
+
+ public void reInit(){
+ bufsize = Integer.MAX_VALUE;
+ endOf_UNUSED_Buffer = bufsize;
+ endOf_USED_Buffer = bufsize;
+ line = 0;
+ prevCharIsCR = false;
+ prevCharIsLF = false;
+ tokenBegin = -1;
+ maxUnusedBufferSize = bufsize;
+ }
+
public void reInit(java.io.Reader stream) throws IOException{
done();
inputStream = stream;
@@ -239,5 +262,5 @@ public class [LEXER_NAME] {
bufsize += maxUnusedBufferSize;
endOf_UNUSED_Buffer = bufsize;
tokenBegin = 0;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 5317fc2..a73a236 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -39,9 +39,13 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
+import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
@@ -76,12 +80,14 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
@@ -316,19 +322,8 @@ public class MetadataBootstrap {
}
private static void insertInitialAdapters(MetadataTransactionContext mdTxnCtx) throws Exception {
- String[] builtInAdapterClassNames = new String[] {
- "org.apache.asterix.external.adapter.factory.PullBasedAzureTwitterAdapterFactory",
- "org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory",
- "org.apache.asterix.external.adapter.factory.HDFSAdapterFactory",
- "org.apache.asterix.external.adapter.factory.HiveAdapterFactory",
- "org.apache.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
- "org.apache.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
- "org.apache.asterix.external.adapter.factory.RSSFeedAdapterFactory",
- "org.apache.asterix.external.adapter.factory.CNNFeedAdapterFactory",
- "org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",
- "org.apache.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
- "org.apache.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
- "org.apache.asterix.tools.external.data.SocketClientAdapterFactory" };
+ String[] builtInAdapterClassNames = new String[] { GenericAdapterFactory.class.getName(),
+ GenericSocketFeedAdapterFactory.class.getName(), SocketClientAdapterFactory.class.getName() };
DatasourceAdapter adapter;
for (String adapterClassName : builtInAdapterClassNames) {
adapter = getAdapter(adapterClassName);
@@ -349,11 +344,9 @@ public class MetadataBootstrap {
}
private static void insertInitialCompactionPolicies(MetadataTransactionContext mdTxnCtx) throws Exception {
- String[] builtInCompactionPolicyClassNames = new String[] {
- "org.apache.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory",
- "org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory",
- "org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory",
- "org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory" };
+ String[] builtInCompactionPolicyClassNames = new String[] { ConstantMergePolicyFactory.class.getName(),
+ PrefixMergePolicyFactory.class.getName(), NoMergePolicyFactory.class.getName(),
+ CorrelatedPrefixMergePolicyFactory.class.getName() };
CompactionPolicy compactionPolicy;
for (String policyClassName : builtInCompactionPolicyClassNames) {
compactionPolicy = getCompactionPolicyEntity(policyClassName);
@@ -362,7 +355,7 @@ public class MetadataBootstrap {
}
private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws Exception {
- String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getName();
+ String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
adapterFactoryClassName, DatasourceAdapter.AdapterType.INTERNAL);
}
@@ -378,8 +371,7 @@ public class MetadataBootstrap {
ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
int metadataDeviceId = metadataPartition.getIODeviceNum();
String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
- AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
- metadataPartition.getPartitionId());
+ AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId());
String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 745f436..c9157df 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -23,7 +23,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -52,17 +51,17 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactor
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.parse.IParseFileSplitsDecl;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
-import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
+import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
@@ -100,8 +99,6 @@ import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
-import org.apache.asterix.runtime.external.ExternalBTreeSearchOperatorDescriptor;
-import org.apache.asterix.runtime.external.ExternalRTreeSearchOperatorDescriptor;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
@@ -153,11 +150,8 @@ import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -202,8 +196,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private final AsterixStorageProperties storageProperties;
- public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
-
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
}
@@ -490,10 +482,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
- if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
- || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
- throw new AlgebricksException(" External dataset adapter does not support read operation");
- }
+
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
adapterFactory);
AlgebricksPartitionConstraint constraint;
@@ -552,24 +541,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated,
List<List<String>> primaryKeys) throws AlgebricksException {
- IAdapterFactory adapterFactory;
- DatasourceAdapter adapterEntity;
- String adapterFactoryClassname;
try {
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity != null) {
- adapterFactoryClassname = adapterEntity.getClassname();
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- } else {
- adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
- if (adapterFactoryClassname == null) {
- throw new AlgebricksException(" Unknown adapter :" + adapterName);
- }
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
-
- adapterFactory.configure(configuration, (ARecordType) itemType);
+ IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
+ (ARecordType) itemType);
// check to see if dataset is indexed
Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -602,11 +576,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
throw new AlgebricksException("Can only scan datasets of records.");
}
- if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
- || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
- throw new AlgebricksException(" External dataset adapter does not support read operation");
- }
-
+ @SuppressWarnings("rawtypes")
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
@@ -623,33 +593,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
}
- @SuppressWarnings("rawtypes")
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(JobSpecification jobSpec,
- IAType itemType, IParseFileSplitsDecl decl, IDataFormat format) throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
- ARecordType rt = (ARecordType) itemType;
- ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
- FileSplit[] splits = decl.getSplits();
- IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
- scannerDesc);
- String[] locs = new String[splits.length];
- for (int i = 0; i < splits.length; i++) {
- locs[i] = splits[i].getNodeName();
- }
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
- }
-
- public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> buildFeedIntakeRuntime(
+ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
- Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+ Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
- IFeedAdapterFactory adapterFactory = factoryOutput.first;
+ IAdapterFactory adapterFactory = factoryOutput.first;
FeedIntakeOperatorDescriptor feedIngestor = null;
switch (factoryOutput.third) {
case INTERNAL:
@@ -665,7 +613,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
- return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory>(feedIngestor,
+ return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
partitionConstraint, adapterFactory);
}
@@ -1515,7 +1463,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
// Generate Output Record format
- ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+ ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
@@ -2102,7 +2050,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
* Calculate an estimate size of the bloom filter. Note that this is an
* estimation which assumes that the data is going to be uniformly
* distributed across all partitions.
- *
* @param dataset
* @return Number of elements that will be used to create a bloom filter per
* dataset per partition
@@ -2147,24 +2094,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
}
- private static Map<String, String> initializeAdapterFactoryMapping() {
- Map<String, String> adapterFactoryMapping = new HashMap<String, String>();
- adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter",
- "org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory");
- adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.HDFSAdapter",
- "org.apache.asterix.external.adapter.factory.HDFSAdapterFactory");
- adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapter",
- "org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapterFactory");
- adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.RSSFeedAdapter",
- "org.apache.asterix.external.dataset.adapter..RSSFeedAdapterFactory");
- adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.CNNFeedAdapter",
- "org.apache.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
- adapterFactoryMapping.put("org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapter",
- "org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory");
-
- return adapterFactoryMapping;
- }
-
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
throws MetadataException {
DatasourceAdapter adapter = null;
@@ -2232,35 +2161,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
return FormatUtils.getDefaultFormat();
}
- /**
- * Add HDFS scheduler and the cluster location constraint into the scheduler
- *
- * @param properties
- * the original dataset properties
- * @return a new map containing the original dataset properties and the
- * scheduler/locations
- */
- private static Map<String, Object> wrapProperties(Map<String, String> properties) {
- Map<String, Object> wrappedProperties = new HashMap<String, Object>();
- wrappedProperties.putAll(properties);
- // wrappedProperties.put(SCHEDULER, hdfsScheduler);
- // wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
- return wrappedProperties;
- }
-
- /**
- * Adapt the original properties to a string-object map
- *
- * @param properties
- * the original properties
- * @return the new stirng-object map
- */
- private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
- Map<String, Object> wrappedProperties = new HashMap<String, Object>();
- wrappedProperties.putAll(properties);
- return wrappedProperties;
- }
-
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
@@ -2284,67 +2184,54 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainNull)
throws AlgebricksException {
- // Get data type
- IAType itemType = null;
try {
+ // Get data type
+ IAType itemType = null;
itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
- } catch (MetadataException e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to get item type from metadata " + e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
- // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
- HDFSLookupAdapterFactory adapterFactory = new HDFSLookupAdapterFactory();
- adapterFactory.configure(itemType, retainInput, ridIndexes, datasetDetails.getProperties(), retainNull);
+ // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getAdapterFactory(
+ datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainNull,
+ context.getNullWriterFactory());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
- try {
- compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
- } catch (MetadataException e) {
- throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
- }
-
- boolean temp = dataset.getDatasetDetails().isTemp();
- // Create the file index data flow helper
- ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
-
- // Create the out record descriptor, appContext and fileSplitProvider for the files index
- RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName().
-
- concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
- } catch (
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+ try {
+ compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+ } catch (MetadataException e) {
+ throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
+ }
- Exception e)
+ boolean temp = datasetDetails.isTemp();
+ // Create the file index data flow helper
+ ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
- {
+ // Create the out record descriptor, appContext and fileSplitProvider for the files index
+ RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+ dataset.getDatasetName(),
+ dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
+ ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new SecondaryIndexSearchOperationCallbackFactory();
+ // Create the operator
+ ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
+ outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
+ appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
+ retainNull, context.getNullWriterFactory());
+
+ // Return value
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
+ } catch (Exception e) {
throw new AlgebricksException(e);
}
-
- ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new SecondaryIndexSearchOperationCallbackFactory();
- // Create the operator
- ExternalLoopkupOperatorDiscriptor op = new ExternalLoopkupOperatorDiscriptor(jobSpec, adapterFactory,
- outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
- appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
- metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
- retainNull, context.getNullWriterFactory());
-
- // Return value
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
deleted file mode 100644
index f4484cf..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class FieldExtractingAdapter implements IDatasourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private final RecordDescriptor inRecDesc;
-
- private final RecordDescriptor outRecDesc;
-
- private final IDatasourceAdapter wrappedAdapter;
-
- private final FieldExtractingPushRuntime fefw;
-
- public FieldExtractingAdapter(IHyracksTaskContext ctx, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
- int[][] extractFields, ARecordType rType, IDatasourceAdapter wrappedAdapter) {
- this.inRecDesc = inRecDesc;
- this.outRecDesc = outRecDesc;
- this.wrappedAdapter = wrappedAdapter;
- fefw = new FieldExtractingPushRuntime(ctx, extractFields, rType);
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- fefw.setInputRecordDescriptor(0, inRecDesc);
- fefw.setFrameWriter(0, writer, outRecDesc);
- fefw.open();
- try {
- wrappedAdapter.start(partition, fefw);
- } catch (Throwable t) {
- fefw.fail();
- throw t;
- } finally {
- fefw.close();
- }
- }
-
- private static class FieldExtractingPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-
- private final IHyracksTaskContext ctx;
-
- private final int[][] extractFields;
-
- private final ARecordType rType;
-
- private final int nullBitmapSize;
-
- private final ArrayTupleBuilder tb;
-
- public FieldExtractingPushRuntime(IHyracksTaskContext ctx, int[][] extractFields, ARecordType rType) {
- this.ctx = ctx;
- this.extractFields = extractFields;
- this.rType = rType;
- nullBitmapSize = ARecordType.computeNullBitmapSize(rType);
- tb = new ArrayTupleBuilder(extractFields.length + 1);
- }
-
- @Override
- public void open() throws HyracksDataException {
- initAccessAppendRef(ctx);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- for (int i = 0; i < tAccess.getTupleCount(); ++i) {
- tb.reset();
- tRef.reset(tAccess, i);
- byte[] record = tRef.getFieldData(0);
- int recStart = tRef.getFieldStart(0);
- int recLength = tRef.getFieldLength(0);
- for (int f = 0; f < extractFields.length; ++f) {
- try {
- byte[] subRecord = record;
- int subFStart = recStart;
- int subFOffset = 0;
- boolean isNull = false;
- IAType subFType = rType;
- int subFLen = recLength;
- int subBitMapSize = nullBitmapSize;
- byte[] subRecordTmp;
-
- for (int j = 0; j < extractFields[f].length; j++) {
- //Get offset for subfield
- subFOffset = ARecordSerializerDeserializer.getFieldOffsetById(subRecord, subFStart,
- extractFields[f][j], subBitMapSize, ((ARecordType) subFType).isOpen());
- if (subFOffset == 0) {
- tb.getDataOutput().write(ATypeTag.NULL.serialize());
- isNull = true;
- break;
- } else {
- //Get type of subfield
- subFType = ((ARecordType) subFType).getFieldTypes()[extractFields[f][j]];
- try {
- //Get length of subfield
- subFLen = NonTaggedFormatUtil.getFieldValueLength(subRecord,
- subFStart + subFOffset, subFType.getTypeTag(), false);
-
- if (j < extractFields[f].length - 1) {
- subRecordTmp = new byte[subFLen + 1];
- subRecordTmp[0] = subFType.getTypeTag().serialize();
- System.arraycopy(subRecord, subFStart + subFOffset, subRecordTmp, 1, subFLen);
- subRecord = subRecordTmp;
- subFStart = 0;
- subBitMapSize = ARecordType.computeNullBitmapSize((ARecordType) subFType);
- }
-
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (!isNull) {
- tb.getDataOutput().write(subFType.getTypeTag().serialize());
- tb.getDataOutput().write(subRecord, subFStart + subFOffset, subFLen);
- }
-
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- tb.addFieldEndOffset();
- }
- tb.addField(record, recStart, tRef.getFieldLength(0));
- appendToFrameFromTupleBuilder(tb);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- flushIfNotFailed();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
deleted file mode 100644
index 989e4a3..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class FieldExtractingAdapterFactory implements IAdapterFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final IAdapterFactory wrappedAdapterFactory;
-
- private final RecordDescriptor inRecDesc;
-
- private final RecordDescriptor outRecDesc;
-
- private final int[][] extractFields;
-
- private final ARecordType rType;
-
- public FieldExtractingAdapterFactory(IAdapterFactory wrappedAdapterFactory, RecordDescriptor inRecDesc,
- RecordDescriptor outRecDesc, int[][] extractFields, ARecordType rType) {
- this.wrappedAdapterFactory = wrappedAdapterFactory;
- this.inRecDesc = inRecDesc;
- this.outRecDesc = outRecDesc;
- this.extractFields = extractFields;
- this.rType = rType;
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return wrappedAdapterFactory.getSupportedOperations();
- }
-
- @Override
- public String getName() {
- return "FieldExtractingAdapter[ " + wrappedAdapterFactory.getName() + " ]";
- }
-
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return wrappedAdapterFactory.getPartitionConstraint();
- }
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
- return new FieldExtractingAdapter(ctx, inRecDesc, outRecDesc, extractFields, rType, wrappedAdapter);
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- wrappedAdapterFactory.configure(configuration, outputType);
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return wrappedAdapterFactory.getAdapterOutputType();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
deleted file mode 100644
index e0c5fc0..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.base.AMutableUUID;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class PKGeneratingAdapter implements IDatasourceAdapter {
-
- private static final long serialVersionUID = 1L;
- private final RecordDescriptor inRecDesc;
- private final RecordDescriptor outRecDesc;
- private final IDatasourceAdapter wrappedAdapter;
- private final PKGeneratingPushRuntime pkRuntime;
- private final int pkIndex;
-
- public PKGeneratingAdapter(IHyracksTaskContext ctx, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
- ARecordType inRecType, ARecordType outRecType, IDatasourceAdapter wrappedAdapter, int pkIndex) {
- this.inRecDesc = inRecDesc;
- this.outRecDesc = outRecDesc;
- this.wrappedAdapter = wrappedAdapter;
- this.pkRuntime = new PKGeneratingPushRuntime(ctx, inRecType, outRecType);
- this.pkIndex = pkIndex;
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- pkRuntime.setInputRecordDescriptor(0, inRecDesc);
- pkRuntime.setFrameWriter(0, writer, outRecDesc);
- pkRuntime.open();
- try {
- wrappedAdapter.start(partition, pkRuntime);
- } catch (Throwable t) {
- pkRuntime.fail();
- throw t;
- } finally {
- pkRuntime.close();
- }
- }
-
- private class PKGeneratingPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- private final IHyracksTaskContext ctx;
- private final ARecordType outRecType;
- private final ArrayTupleBuilder tb;
- private final AMutableUUID aUUID = new AMutableUUID(0, 0);
- private final byte AUUIDTag = ATypeTag.UUID.serialize();
- private final byte[] serializedUUID = new byte[16];
- private final PointableAllocator pa = new PointableAllocator();
- private final ARecordVisitablePointable recordPointable;
- private final IAType[] outClosedTypes;
-
- private final RecordBuilder recBuilder;
-
- public PKGeneratingPushRuntime(IHyracksTaskContext ctx, ARecordType inRecType, ARecordType outRecType) {
- this.ctx = ctx;
- this.outRecType = outRecType;
- this.tb = new ArrayTupleBuilder(2);
- this.recBuilder = new RecordBuilder();
- this.recordPointable = (ARecordVisitablePointable) pa.allocateRecordValue(inRecType);
- this.outClosedTypes = outRecType.getFieldTypes();
- }
-
- /*
- * We write this method in low level instead of using pre-existing libraries since this will be called for each record and to avoid
- * size validation
- */
- private void serializeUUID(AUUID aUUID, byte[] serializedUUID) {
- long v = aUUID.getLeastSignificantBits();
- serializedUUID[0] = (byte) (v >>> 56);
- serializedUUID[1] = (byte) (v >>> 48);
- serializedUUID[2] = (byte) (v >>> 40);
- serializedUUID[3] = (byte) (v >>> 32);
- serializedUUID[4] = (byte) (v >>> 24);
- serializedUUID[5] = (byte) (v >>> 16);
- serializedUUID[6] = (byte) (v >>> 8);
- serializedUUID[7] = (byte) (v >>> 0);
- v = aUUID.getMostSignificantBits();
- serializedUUID[8] = (byte) (v >>> 56);
- serializedUUID[9] = (byte) (v >>> 48);
- serializedUUID[10] = (byte) (v >>> 40);
- serializedUUID[11] = (byte) (v >>> 32);
- serializedUUID[12] = (byte) (v >>> 24);
- serializedUUID[13] = (byte) (v >>> 16);
- serializedUUID[14] = (byte) (v >>> 8);
- serializedUUID[15] = (byte) (v >>> 0);
- }
-
- @Override
- public void open() throws HyracksDataException {
- initAccessAppendRef(ctx);
- recBuilder.reset(outRecType);
- recBuilder.init();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- tAccess.reset(buffer);
- for (int i = 0; i < tAccess.getTupleCount(); ++i) {
- tb.reset();
- tRef.reset(tAccess, i);
-
- // We need to do the following:
- // 1. generate a UUID
- // 2. fill in the first field with the UUID
- aUUID.nextUUID();
- tb.getDataOutput().writeByte(AUUIDTag);
- serializeUUID(aUUID, serializedUUID);
- tb.getDataOutput().write(serializedUUID);
- tb.addFieldEndOffset();
- // 3. fill in the second field with the record after adding to it the UUID
- recordPointable.set(tRef.getFieldData(0), tRef.getFieldStart(0), tRef.getFieldLength(0));
- // Start by closed fields
- int inIndex = 0;
- for (int f = 0; f < outClosedTypes.length; f++) {
- if (f == pkIndex) {
- recBuilder.addField(f, serializedUUID);
- } else {
- recBuilder.addField(f, recordPointable.getFieldValues().get(inIndex));
- inIndex++;
- }
- }
-
- // Add open fields
- if (outRecType.isOpen()) {
- List<IVisitablePointable> fp = recordPointable.getFieldNames();
- if (fp.size() >= outClosedTypes.length) {
- int index = outClosedTypes.length - 1;
- while (index < fp.size()) {
- recBuilder.addField(fp.get(index), recordPointable.getFieldValues().get(index));
- index++;
- }
- }
- }
- //write the record
- recBuilder.write(tb.getDataOutput(), true);
- tb.addFieldEndOffset();
- appendToFrameFromTupleBuilder(tb);
- }
- } catch (Exception e) {
- throw new HyracksDataException("Error in the auto id generation and merge of the record", e);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- flushIfNotFailed();
- }
- }
-
-}