You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:18 UTC
[20/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
index 822390a..4067508 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.common.feeds.api;
-public interface ITupleTrackingFeedAdapter extends IFeedAdapter {
+public interface ITupleTrackingFeedAdapter extends IDataSourceAdapter {
public void tuplePersistedTimeCallback(long timestamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
deleted file mode 100644
index 87f4c58..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.parse;
-
-import java.util.Map;
-
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public interface IAsterixTupleParser extends ITupleParser{
-
- public void configure(Map<String, String> configuration);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
deleted file mode 100644
index df5a983..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.parse;
-
-import java.util.Map;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface ITupleForwardPolicy {
-
- public static final String PARSER_POLICY = "parser-policy";
-
- public enum TupleForwardPolicyType {
- FRAME_FULL,
- COUNTER_TIMER_EXPIRED,
- RATE_CONTROLLED
- }
-
- public void configure(Map<String, String> configuration);
-
- public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
-
- public TupleForwardPolicyType getType();
-
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
-
- public void close() throws HyracksDataException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
new file mode 100644
index 0000000..5ee065a
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.parse;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleForwarder {
+
+ public static final String FORWARD_POLICY = "forward-policy";
+
+ public enum TupleForwardPolicy {
+ FRAME_FULL,
+ COUNTER_TIMER_EXPIRED,
+ RATE_CONTROLLED
+ }
+
+ public void configure(Map<String, String> configuration);
+
+ public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+ public void close() throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d8147b6..6afe692 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -29,7 +29,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -63,12 +62,12 @@ public class TestExecutor {
private static final long MAX_URL_LENGTH = 2000l;
private static Method managixExecuteMethod = null;
- private static String host;
- private static int port;
+ private String host;
+ private int port;
public TestExecutor() {
- this.host = "127.0.0.1";
- this.port = 19002;
+ host = "127.0.0.1";
+ port = 19002;
}
public TestExecutor(String host, int port) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
index 96fb6ec..a10b5ea 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
@@ -29,6 +29,10 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.event.error.EventException;
+import org.apache.asterix.event.model.AsterixInstance;
+import org.apache.asterix.installer.schema.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,10 +42,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.event.error.EventException;
-import org.apache.asterix.event.model.AsterixInstance;
-import org.apache.asterix.installer.schema.conf.Configuration;
public class ZooKeeperService implements ILookupService {
@@ -63,6 +63,7 @@ public class ZooKeeperService implements ILookupService {
private LinkedBlockingQueue<String> msgQ = new LinkedBlockingQueue<String>();
private ZooKeeperWatcher watcher = new ZooKeeperWatcher(msgQ);
+ @Override
public boolean isRunning(Configuration conf) throws Exception {
List<String> servers = conf.getZookeeper().getServers().getServer();
int clientPort = conf.getZookeeper().getClientPort().intValue();
@@ -92,6 +93,7 @@ public class ZooKeeperService implements ILookupService {
return isRunning;
}
+ @Override
public void startService(Configuration conf) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Starting ZooKeeper at " + zkConnectionString);
@@ -107,22 +109,29 @@ public class ZooKeeperService implements ILookupService {
for (String zkServer : zkServers) {
cmdBuffer.append(zkServer + " ");
}
- Runtime.getRuntime().exec(cmdBuffer.toString());
+ //TODO: Create a better way to interact with zookeeper
+ Process zkProcess = Runtime.getRuntime().exec(cmdBuffer.toString());
+ int output = zkProcess.waitFor();
+ if (output != 0) {
+ throw new Exception("Error starting zookeeper server. output code = " + output);
+ }
zk = new ZooKeeper(zkConnectionString, ZOOKEEPER_SESSION_TIME_OUT, watcher);
- String head = msgQ.poll(10, TimeUnit.SECONDS);
+ String head = msgQ.poll(60, TimeUnit.SECONDS);
if (head == null) {
StringBuilder msg = new StringBuilder(
"Unable to start Zookeeper Service. This could be because of the following reasons.\n");
msg.append("1) Managix is incorrectly configured. Please run " + "managix validate"
+ " to run a validation test and correct the errors reported.");
- msg.append("\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
- + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
+ msg.append(
+ "\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
+ + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
throw new Exception(msg.toString());
}
msgQ.take();
createRootIfNotExist();
}
+ @Override
public void stopService(Configuration conf) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Stopping ZooKeeper running at " + zkConnectionString);
@@ -141,6 +150,7 @@ public class ZooKeeperService implements ILookupService {
}
}
+ @Override
public void writeAsterixInstance(AsterixInstance asterixInstance) throws Exception {
String instanceBasePath = ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstance.getName();
ByteArrayOutputStream b = new ByteArrayOutputStream();
@@ -166,6 +176,7 @@ public class ZooKeeperService implements ILookupService {
}
}
+ @Override
public AsterixInstance getAsterixInstance(String name) throws Exception {
String path = ASTERIX_INSTANCE_BASE_PATH + File.separator + name;
Stat stat = zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, false);
@@ -176,10 +187,12 @@ public class ZooKeeperService implements ILookupService {
return readAsterixInstanceObject(asterixInstanceBytes);
}
+ @Override
public boolean exists(String path) throws Exception {
return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + path, false) != null;
}
+ @Override
public void removeAsterixInstance(String name) throws Exception {
if (!exists(name)) {
throw new EventException("Asterix instance by name " + name + " does not exists.");
@@ -195,6 +208,7 @@ public class ZooKeeperService implements ILookupService {
zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, DEFAULT_NODE_VERSION);
}
+ @Override
public List<AsterixInstance> getAsterixInstances() throws Exception {
List<String> instanceNames = zk.getChildren(ASTERIX_INSTANCE_BASE_PATH, false);
List<AsterixInstance> asterixInstances = new ArrayList<AsterixInstance>();
@@ -207,13 +221,14 @@ public class ZooKeeperService implements ILookupService {
return asterixInstances;
}
- private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes) throws IOException,
- ClassNotFoundException {
+ private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes)
+ throws IOException, ClassNotFoundException {
ByteArrayInputStream b = new ByteArrayInputStream(asterixInstanceBytes);
ObjectInputStream ois = new ObjectInputStream(b);
return (AsterixInstance) ois.readObject();
}
+ @Override
public void updateAsterixInstance(AsterixInstance updatedInstance) throws Exception {
removeAsterixInstance(updatedInstance.getName());
writeAsterixInstance(updatedInstance);
@@ -249,6 +264,7 @@ class ZooKeeperWatcher implements Watcher {
this.msgQ = msgQ;
}
+ @Override
public void process(WatchedEvent wEvent) {
if (wEvent.getState() == KeeperState.SyncConnected) {
msgQ.add("connected");
@@ -276,7 +292,8 @@ class ZookeeperUtil {
List<String> servers = conf.getZookeeper().getServers().getServer();
int serverId = 1;
for (String server : servers) {
- buffer.append("server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n");
+ buffer.append(
+ "server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n");
serverId++;
}
AsterixEventServiceUtil.dumpToFile(zooKeeperConfigPath, buffer.toString());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 4062b23..867c96b 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -35,6 +35,43 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>lexer-generator-maven-plugin</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <configuration>
+ <grammarFile>src/main/resources/adm.grammar</grammarFile>
+ <outputDir>${project.build.directory}/generated-sources/org/apache/asterix/runtime/operators/file/adm</outputDir>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate-lexer</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate-lexer</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.jvnet.jaxb2.maven2</groupId>
<artifactId>maven-jaxb2-plugin</artifactId>
<version>0.9.0</version>
@@ -91,6 +128,50 @@
</executions>
</plugin>
</plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId> org.apache.asterix</groupId>
+ <artifactId> lexer-generator-maven-plugin</artifactId>
+ <versionRange>[0.1,)</versionRange>
+ <goals>
+ <goal>generate-lexer</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <execute>
+ <runOnIncremental>false</runOnIncremental>
+ </execute>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId> org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <versionRange>[1.7,)</versionRange>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore />
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
<dependencies>
<dependency>
@@ -139,6 +220,10 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
<groupId>net.java.dev.rome</groupId>
<artifactId>rome-fetcher</artifactId>
<version>1.0.0</version>
@@ -186,5 +271,11 @@
<artifactId>jdo2-api</artifactId>
<version>2.3-20090302111651</version>
</dependency>
+ <dependency>
+ <groupId>com.e-movimento.tinytools</groupId>
+ <artifactId>privilegedaccessor</artifactId>
+ <version>1.2.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
deleted file mode 100644
index 8b7b6d5..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * A factory class for creating the @see {CNNFeedAdapter}.
- */
-public class CNNFeedAdapterFactory implements IFeedAdapterFactory {
- private static final long serialVersionUID = 1L;
-
- private Map<String, String> configuration;
-
- private List<String> feedURLs = new ArrayList<String>();
- private static Map<String, String> topicFeeds = new HashMap<String, String>();
- private ARecordType recordType;
- public static final String KEY_RSS_URL = "topic";
- public static final String KEY_INTERVAL = "interval";
- public static final String TOP_STORIES = "topstories";
- public static final String WORLD = "world";
- public static final String US = "us";
- public static final String SPORTS = "sports";
- public static final String BUSINESS = "business";
- public static final String POLITICS = "politics";
- public static final String CRIME = "crime";
- public static final String TECHNOLOGY = "technology";
- public static final String HEALTH = "health";
- public static final String ENTERNTAINMENT = "entertainemnt";
- public static final String TRAVEL = "travel";
- public static final String LIVING = "living";
- public static final String VIDEO = "video";
- public static final String STUDENT = "student";
- public static final String POPULAR = "popular";
- public static final String RECENT = "recent";
-
- private void initTopics() {
- topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
- topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
- topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
- topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
- topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
- topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
- topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
- topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
- topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
- topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
- topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
- topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
- topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
- topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
- topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
- topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
- }
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- RSSFeedAdapter cnnFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
- return cnnFeedAdapter;
- }
-
- @Override
- public String getName() {
- return "cnn_feed";
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- this.configuration = configuration;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
- if (rssURLProperty == null) {
- throw new IllegalArgumentException("no rss url provided");
- }
- initializeFeedURLs(rssURLProperty);
- this.recordType = outputType;
- }
-
- private void initializeFeedURLs(String rssURLProperty) {
- feedURLs.clear();
- String[] rssTopics = rssURLProperty.split(",");
- initTopics();
- for (String topic : rssTopics) {
- String feedURL = topicFeeds.get(topic);
- if (feedURL == null) {
- throw new IllegalArgumentException(
- " unknown topic :" + topic + " please choose from the following " + getValidTopics());
- }
- feedURLs.add(feedURL);
- }
- }
-
- private static String getValidTopics() {
- StringBuilder builder = new StringBuilder();
- for (String key : topicFeeds.keySet()) {
- builder.append(key);
- builder.append(" ");
- }
- return new String(builder);
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(feedURLs.size());
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return recordType;
- }
-
- @Override
- public boolean isRecordTrackingEnabled() {
- return false;
- }
-
- @Override
- public IIntakeProgressTracker createIntakeProgressTracker() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
new file mode 100644
index 0000000..2e7158d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.GenericAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.provider.DataflowControllerProvider;
+import org.apache.asterix.external.provider.DatasourceFactoryProvider;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private IExternalDataSourceFactory dataSourceFactory;
+ private IDataParserFactory dataParserFactory;
+ private ARecordType recordType;
+ private Map<String, String> configuration;
+ private List<ExternalFile> files;
+ private boolean indexingOp;
+
+ @Override
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+ this.files = files;
+ this.indexingOp = indexingOp;
+ }
+
+ @Override
+ public String getAlias() {
+ return ExternalDataConstants.ALIAS_GENERIC_ADAPTER;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return dataSourceFactory.getPartitionConstraint();
+ }
+
+ /**
+ * Runs on each node controller (after serialization-deserialization)
+ */
+ @Override
+ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
+ dataSourceFactory, dataParserFactory, configuration, indexingOp);
+ return new GenericAdapter(controller);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ this.recordType = outputType;
+ this.configuration = configuration;
+ dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
+ dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
+ prepare();
+ ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
+ }
+
+ private void prepare() throws Exception {
+ if (dataSourceFactory.isIndexible() && (files != null)) {
+ ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
+ }
+ dataSourceFactory.configure(configuration);
+ dataParserFactory.setRecordType(recordType);
+ dataParserFactory.configure(configuration);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
deleted file mode 100644
index c4a96f4..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
-
-/**
- * A factory class for creating an instance of HDFSAdapter
- */
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
- private static final long serialVersionUID = 1L;
-
- public static final String HDFS_ADAPTER_NAME = "hdfs";
- public static final String CLUSTER_LOCATIONS = "cluster-locations";
- public static transient String SCHEDULER = "hdfs-scheduler";
-
- public static final String KEY_HDFS_URL = "hdfs";
- public static final String KEY_PATH = "path";
- public static final String KEY_INPUT_FORMAT = "input-format";
- public static final String INPUT_FORMAT_TEXT = "text-input-format";
- public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
- // New
- public static final String KEY_PARSER = "parser";
- public static final String PARSER_HIVE = "hive-parser";
- public static final String INPUT_FORMAT_RC = "rc-input-format";
- public static final String FORMAT_BINARY = "binary";
-
- public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
-
- // Hadoop property names constants
- public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
- public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat";
- public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
- public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
- public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
- public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
- public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
- public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
- public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
- public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
-
- private transient AlgebricksPartitionConstraint clusterLocations;
- private String[] readSchedule;
- private boolean executed[];
- private InputSplitsFactory inputSplitsFactory;
- private ConfFactory confFactory;
- private IAType atype;
- private boolean configured = false;
- public static Scheduler hdfsScheduler;
- private static boolean initialized = false;
- protected List<ExternalFile> files;
-
- private static Scheduler initializeHDFSScheduler() {
- ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
- Scheduler scheduler = null;
- try {
- scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
- } catch (HyracksException e) {
- throw new IllegalStateException("Cannot obtain hdfs scheduler");
- }
- return scheduler;
- }
-
- protected static final Map<String, String> formatClassNames = initInputFormatMap();
-
- protected static Map<String, String> initInputFormatMap() {
- Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, CLASS_NAME_TEXT_INPUT_FORMAT);
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, CLASS_NAME_SEQUENCE_INPUT_FORMAT);
- formatClassNames.put(INPUT_FORMAT_RC, CLASS_NAME_RC_INPUT_FORMAT);
- return formatClassNames;
- }
-
- public JobConf getJobConf() throws HyracksDataException {
- return confFactory.getConf();
- }
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- JobConf conf = confFactory.getConf();
- InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName,
- parserFactory, ctx, configuration, files);
- return hdfsAdapter;
- }
-
- @Override
- public String getName() {
- return HDFS_ADAPTER_NAME;
- }
-
- public static JobConf configureJobConf(Map<String, String> configuration) throws Exception {
- JobConf conf = new JobConf();
- String formatClassName = formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim());
- String localShortCircuitSocketPath = configuration.get(KEY_LOCAL_SOCKET_PATH);
- if (formatClassName == null) {
- formatClassName = configuration.get(KEY_INPUT_FORMAT).trim();
- }
- conf.set(KEY_HADOOP_FILESYSTEM_URI, configuration.get(KEY_HDFS_URL).trim());
- conf.set(KEY_HADOOP_FILESYSTEM_CLASS, CLASS_NAME_HDFS_FILESYSTEM);
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set(KEY_HADOOP_INPUT_DIR, configuration.get(KEY_PATH).trim());
- conf.set(KEY_HADOOP_INPUT_FORMAT, formatClassName);
-
- // Enable local short circuit reads if user supplied the parameters
- if (localShortCircuitSocketPath != null) {
- conf.set(KEY_HADOOP_SHORT_CIRCUIT, "true");
- conf.set(KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim());
- }
- return conf;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- if (!configured) {
- throw new IllegalStateException("Adapter factory has not been configured yet");
- }
- return clusterLocations;
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- if (!initialized) {
- hdfsScheduler = initializeHDFSScheduler();
- initialized = true;
- }
- this.configuration = configuration;
- JobConf conf = configureJobConf(configuration);
- confFactory = new ConfFactory(conf);
-
- clusterLocations = getClusterLocations();
- int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
-
- // if files list was set, we restrict the splits to the list since this dataset is indexed
- InputSplit[] inputSplits;
- if (files == null) {
- inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
- } else {
- inputSplits = getSplits(conf);
- }
- inputSplitsFactory = new InputSplitsFactory(inputSplits);
-
- readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
- executed = new boolean[readSchedule.length];
- Arrays.fill(executed, false);
- configured = true;
-
- atype = outputType;
- configureFormat(atype);
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- public static AlgebricksPartitionConstraint getClusterLocations() {
- ArrayList<String> locs = new ArrayList<String>();
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- for (int j = 0; j < nodeStores.length; j++) {
- //two readers per partition
- locs.add(i);
- locs.add(i);
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return (ARecordType) atype;
- }
-
- @Override
- public InputDataFormat getInputDataFormat() {
- return InputDataFormat.UNKNOWN;
- }
-
- /*
- * This method is overridden to do the following:
- * if data is text data (adm or delimited text), it will use a text tuple parser,
- * otherwise it will use hdfs record object parser
- */
- @Override
- protected void configureFormat(IAType sourceDatatype) throws Exception {
- String specifiedFormat = configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
- if (specifiedFormat == null) {
- throw new IllegalArgumentException(" Unspecified data format");
- }
-
- if (AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
- } else {
- InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
- if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- inputFormat = InputDataFormat.DELIMITED;
- } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
- inputFormat = InputDataFormat.ADM;
- }
- parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, inputFormat);
- }
-
- }
-
- /**
- * Instead of creating the split using the input format, we do it manually
- * This function returns fileSplits (1 per hdfs file block) irrespective of the number of partitions
- * and the produced splits only cover intersection between current files in hdfs and files stored internally
- * in AsterixDB
- * 1. NoOp means appended file
- * 2. AddOp means new file
- * 3. UpdateOp means the delta of a file
- *
- * @return
- * @throws IOException
- */
- protected InputSplit[] getSplits(JobConf conf) throws IOException {
- ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
- ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
- // Create file system object
- try (FileSystem fs = FileSystem.get(conf)) {
- // Create files splits
- for (ExternalFile file : files) {
- Path filePath = new Path(file.getFileName());
- FileStatus fileStatus;
- try {
- fileStatus = fs.getFileStatus(filePath);
- } catch (FileNotFoundException e) {
- // file was deleted at some point, skip to next file
- continue;
- }
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() < file.getSize()) {
- fileSplits.add(new FileSplit(filePath,
- block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
- ? block.getLength() : (file.getSize() - block.getOffset()),
- block.getHosts()));
- orderedExternalFiles.add(file);
- }
- }
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
- && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
- long oldSize = 0L;
- long newSize = file.getSize();
- for (int i = 0; i < files.size(); i++) {
- if (files.get(i).getFileName() == file.getFileName()
- && files.get(i).getSize() != file.getSize()) {
- newSize = files.get(i).getSize();
- oldSize = file.getSize();
- break;
- }
- }
-
- // Get its information from HDFS name node
- BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
- // Create a split per block
- for (BlockLocation block : fileBlocks) {
- if (block.getOffset() + block.getLength() > oldSize) {
- if (block.getOffset() < newSize) {
- // Block interact with delta -> Create a split
- long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
- long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
- : block.getOffset() + block.getLength() - newSize;
- long splitLength = block.getLength() - startCut - endCut;
- fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
- block.getHosts()));
- orderedExternalFiles.add(file);
- }
- }
- }
- }
- }
- }
- files = orderedExternalFiles;
- return fileSplits.toArray(new FileSplit[fileSplits.size()]);
- }
-
- // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
- public void setFiles(List<ExternalFile> files) {
- this.files = files;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
deleted file mode 100644
index 8bf6d93..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSIndexingAdapter;
-import org.apache.asterix.external.indexing.dataflow.HDFSIndexingParserFactory;
-import org.apache.asterix.external.indexing.dataflow.IndexingScheduler;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
-
-public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
-
- private static final long serialVersionUID = 1L;
-
- private transient AlgebricksPartitionConstraint clusterLocations;
- private String[] readSchedule;
- private boolean executed[];
- private InputSplitsFactory inputSplitsFactory;
- private ConfFactory confFactory;
- private IAType atype;
- private boolean configured = false;
- public static IndexingScheduler hdfsScheduler;
- private static boolean initialized = false;
- private Map<String, String> configuration;
-
- public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
-
- private static IndexingScheduler initializeHDFSScheduler() {
- ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
- IndexingScheduler scheduler = null;
- try {
- scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
- } catch (HyracksException e) {
- throw new IllegalStateException("Cannot obtain hdfs scheduler");
- }
- return scheduler;
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- @Override
- public String getName() {
- return HDFS_INDEXING_ADAPTER;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- if (!configured) {
- throw new IllegalStateException("Adapter factory has not been configured yet");
- }
- return clusterLocations;
- }
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- JobConf conf = confFactory.getConf();
- InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- ((HDFSIndexingParserFactory) parserFactory).setJobConf(conf);
- ((HDFSIndexingParserFactory) parserFactory).setArguments(configuration);
- HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits,
- conf, clusterLocations, files, parserFactory, ctx, nodeName,
- (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
- (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT));
- return hdfsIndexingAdapter;
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- if (!initialized) {
- hdfsScheduler = initializeHDFSScheduler();
- initialized = true;
- }
- this.configuration = configuration;
- JobConf conf = HDFSAdapterFactory.configureJobConf(configuration);
- confFactory = new ConfFactory(conf);
- clusterLocations = getClusterLocations();
- InputSplit[] inputSplits = getSplits(conf);
- inputSplitsFactory = new InputSplitsFactory(inputSplits);
- readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
- executed = new boolean[readSchedule.length];
- Arrays.fill(executed, false);
- configured = true;
- atype = outputType;
- // The function below is overwritten to create indexing adapter factory instead of regular adapter factory
- configureFormat(atype);
- }
-
- @Override
- protected void configureFormat(IAType sourceDatatype) throws Exception {
-
- char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
- char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
-
- parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
- configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
- configuration.get(AsterixTupleParserFactory.KEY_FORMAT), delimiter, quote,
- configuration.get(HDFSAdapterFactory.KEY_PARSER));
- }
-
- /**
- * A static function that creates and return delimited text data parser
- *
- * @param recordType
- * (the record type to be parsed)
- * @param delimiter
- * (the delimiter value)
- * @return
- */
- public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote) {
- int n = recordType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = null;
- if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
- if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
- throw new NotImplementedException("Non-optional UNION type is not supported.");
- }
- tag = ((AUnionType) recordType.getFieldTypes()[i]).getNullableType().getTypeTag();
- } else {
- tag = recordType.getFieldTypes()[i].getTypeTag();
- }
- if (tag == null) {
- throw new NotImplementedException("Failed to get the type information for field " + i + ".");
- }
- IValueParserFactory vpf = valueParserFactoryMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, false);
- }
-
- public static AlgebricksPartitionConstraint getClusterLocations() {
- ArrayList<String> locs = new ArrayList<String>();
- Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
- for (String i : stores.keySet()) {
- String[] nodeStores = stores.get(i);
- for (int j = 0; j < nodeStores.length; j++) {
- locs.add(i);
- }
- }
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- return new AlgebricksAbsolutePartitionConstraint(cluster);
- }
-
- private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
- private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
- m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- return m;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
deleted file mode 100644
index 553682e..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-import org.apache.asterix.external.dataset.adapter.HiveAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * A factory class for creating an instance of HiveAdapter
- */
-public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
- private static final long serialVersionUID = 1L;
-
- public static final String HIVE_DATABASE = "database";
- public static final String HIVE_TABLE = "table";
- public static final String HIVE_HOME = "hive-home";
- public static final String HIVE_METASTORE_URI = "metastore-uri";
- public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
- public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
-
- private HDFSAdapterFactory hdfsAdapterFactory;
- private HDFSAdapter hdfsAdapter;
- private boolean configured = false;
- private IAType atype;
-
- public HiveAdapterFactory() {
- hdfsAdapterFactory = new HDFSAdapterFactory();
- }
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- hdfsAdapter = (HDFSAdapter) hdfsAdapterFactory.createAdapter(ctx, partition);
- HiveAdapter hiveAdapter = new HiveAdapter(atype, hdfsAdapter, parserFactory, ctx);
- return hiveAdapter;
- }
-
- @Override
- public String getName() {
- return "hive";
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- if (!configured) {
- populateConfiguration(configuration);
- hdfsAdapterFactory.configure(configuration, outputType);
- this.atype = outputType;
- }
- }
-
- public static void populateConfiguration(Map<String, String> configuration) throws Exception {
- /** configure hive */
- String database = configuration.get(HIVE_DATABASE);
- String tablePath = null;
- if (database == null) {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
- } else {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
- + configuration.get(HIVE_TABLE);
- }
- configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
- if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
- .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException(
- "format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
- }
-
- if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)
- || configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT)
- .equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException(
- "file input format" + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
- }
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return hdfsAdapterFactory.getPartitionConstraint();
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return (ARecordType) atype;
- }
-
- @Override
- public InputDataFormat getInputDataFormat() {
- return InputDataFormat.UNKNOWN;
- }
-
- public void setFiles(List<ExternalFile> files) {
- hdfsAdapterFactory.setFiles(files);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
deleted file mode 100644
index b8005cd..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
- * Acts as a marker interface indicating that the implementation provides functionality
- * for creating an adapter.
- */
-public interface IAdapterFactory extends Serializable {
-
- public static final String KEY_TYPE_NAME = "type-name";
-
- public enum SupportedOperation {
- READ,
- WRITE,
- READ_WRITE
- }
-
- /**
- * Returns the type of adapter indicating if the adapter can be used for
- * reading from an external data source or writing to an external data
- * source or can be used for both purposes.
- *
- * @see SupportedOperation
- * @return
- */
- public SupportedOperation getSupportedOperations();
-
- /**
- * Returns the display name corresponding to the Adapter type that is created by the factory.
- *
- * @return the display name
- */
- public String getName();
-
- /**
- * Gets a list of partition constraints. A partition constraint can be a
- * requirement to execute at a particular location or could be cardinality
- * constraints indicating the number of instances that need to run in
- * parallel. example, a IDatasourceAdapter implementation written for data
- * residing on the local file system of a node cannot run on any other node
- * and thus has a location partition constraint. The location partition
- * constraint can be expressed as a node IP address or a node controller id.
- * In the former case, the IP address is translated to a node controller id
- * running on the node with the given IP address.
- */
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
- /**
- * Creates an instance of IDatasourceAdapter.
- *
- * @param HyracksTaskContext
- * @param partition
- * @return An instance of IDatasourceAdapter.
- * @throws Exception
- */
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
-
- /**
- * @param configuration
- * @param outputType
- * @throws Exception
- */
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
-
- /**
- * Gets the record type associated with the output of the adapter
- *
- * @return
- */
- public ARecordType getAdapterOutputType();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
deleted file mode 100644
index 0de6fad..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public interface IControlledAdapterFactory extends Serializable {
- public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor,
- RecordDescriptor inRecDesc);
-
- public void configure(IAType atype, boolean propagateInput, int[] ridFields,
- Map<String, String> adapterConfiguration, boolean retainNull);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
deleted file mode 100644
index 9358a52..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-
-public interface IFeedAdapterFactory extends IAdapterFactory {
-
- public boolean isRecordTrackingEnabled();
-
- public IIntakeProgressTracker createIntakeProgressTracker();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
new file mode 100644
index 0000000..866910b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.dataset.adapter.LookupAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordIdReader;
+import org.apache.asterix.external.indexing.RecordIdReaderFactory;
+import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class LookupAdapterFactory<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private IRecordDataParserFactory dataParserFactory;
+ private ILookupReaderFactory readerFactory;
+ private ARecordType recordType;
+ private int[] ridFields;
+ private Map<String, String> configuration;
+ private boolean retainInput;
+ private boolean retainNull;
+ private int[] propagatedFields;
+ private INullWriterFactory iNullWriterFactory;
+
+ public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
+ INullWriterFactory iNullWriterFactory) {
+ this.recordType = recordType;
+ this.ridFields = ridFields;
+ this.retainInput = retainInput;
+ this.retainNull = retainNull;
+ this.iNullWriterFactory = iNullWriterFactory;
+ }
+
+ public LookupAdapter<T> createAdapter(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecDesc,
+ ExternalFileIndexAccessor snapshotAccessor, IFrameWriter writer) throws HyracksDataException {
+ try {
+ IRecordDataParser<T> dataParser = dataParserFactory.createRecordParser(ctx);
+ dataParser.configure(configuration, recordType);
+ ILookupRecordReader<? extends T> reader = readerFactory.createRecordReader(ctx, partition,
+ snapshotAccessor);
+ reader.configure(configuration);
+ RecordIdReader ridReader = RecordIdReaderFactory.create(configuration, ridFields);
+ configurePropagatedFields(inRecDesc);
+ return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, propagatedFields,
+ retainNull, iNullWriterFactory, ctx, writer);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration);
+ dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+ dataParserFactory.setRecordType(recordType);
+ readerFactory.configure(configuration);
+ dataParserFactory.configure(configuration);
+ }
+
+ private void configurePropagatedFields(RecordDescriptor inRecDesc) {
+ int ptr = 0;
+ boolean skip = false;
+ propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
+ for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
+ if (ptr < ridFields.length) {
+ skip = false;
+ for (int j = 0; j < ridFields.length; j++) {
+ if (ridFields[j] == i) {
+ ptr++;
+ skip = true;
+ break;
+ }
+ }
+ if (!skip)
+ propagatedFields[i - ptr] = i;
+ } else {
+ propagatedFields[i - ptr] = i;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
deleted file mode 100644
index 251d69a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.util.DNSResolverFactory;
-import org.apache.asterix.external.util.INodeResolver;
-import org.apache.asterix.external.util.INodeResolverFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-/**
- * Factory class for creating an instance of NCFileSystemAdapter. An
- * NCFileSystemAdapter reads external data residing on the local file system of
- * an NC.
- */
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
- private static final long serialVersionUID = 1L;
-
- public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
-
- private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
-
- private IAType sourceDatatype;
- private FileSplit[] fileSplits;
- private ARecordType outputType;
-
- @Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
- return fsAdapter;
- }
-
- @Override
- public String getName() {
- return NC_FILE_SYSTEM_ADAPTER_NAME;
- }
-
- @Override
- public SupportedOperation getSupportedOperations() {
- return SupportedOperation.READ;
- }
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- this.configuration = configuration;
- this.outputType = outputType;
- String[] splits = configuration.get(AsterixTupleParserFactory.KEY_PATH).split(",");
- IAType sourceDatatype = outputType;
- configureFileSplits(splits);
- configureFormat(sourceDatatype);
-
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return configurePartitionConstraint();
- }
-
- private void configureFileSplits(String[] splits) throws AsterixException {
- if (fileSplits == null) {
- fileSplits = new FileSplit[splits.length];
- String nodeName;
- String nodeLocalPath;
- int count = 0;
- String trimmedValue;
- for (String splitPath : splits) {
- trimmedValue = splitPath.trim();
- if (!trimmedValue.contains("://")) {
- throw new AsterixException(
- "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
- }
- nodeName = trimmedValue.split(":")[0];
- nodeLocalPath = trimmedValue.split("://")[1];
- FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
- fileSplits[count++] = fileSplit;
- }
- }
- }
-
- private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
- String[] locs = new String[fileSplits.length];
- String location;
- for (int i = 0; i < fileSplits.length; i++) {
- location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
- locs[i] = location;
- }
- return new AlgebricksAbsolutePartitionConstraint(locs);
- }
-
- protected INodeResolver getNodeResolver() {
- if (nodeResolver == null) {
- nodeResolver = initializeNodeResolver();
- }
- return nodeResolver;
- }
-
- private static INodeResolver initializeNodeResolver() {
- INodeResolver nodeResolver = null;
- String configuredNodeResolverFactory = System
- .getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
- if (configuredNodeResolverFactory != null) {
- try {
- nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
- .createNodeResolver();
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
- + configuredNodeResolverFactory + "\n" + e.getMessage());
- }
- nodeResolver = DEFAULT_NODE_RESOLVER;
- }
- } else {
- nodeResolver = DEFAULT_NODE_RESOLVER;
- }
- return nodeResolver;
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return outputType;
- }
-
- @Override
- public InputDataFormat getInputDataFormat() {
- return InputDataFormat.UNKNOWN;
- }
-
- public void setFiles(List<ExternalFile> files) throws AlgebricksException {
- throw new AlgebricksException("can't set files for this Adapter");
- }
-
-}