You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:02 UTC
[08/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
deleted file mode 100644
index db38c12..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-public class SocketClientAdapter implements IDataSourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName());
-
- private static final String LOCALHOST = "127.0.0.1";
-
- private static final long RECONNECT_PERIOD = 2000;
-
- private final String localFile;
-
- private final int port;
-
- private boolean continueStreaming = true;
-
- public SocketClientAdapter(Integer port, String localFile) {
- this.localFile = localFile;
- this.port = port;
- }
-
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- Socket socket = waitForReceiver();
- OutputStream os = socket.getOutputStream();
- FileInputStream fin = new FileInputStream(new File(localFile));
- byte[] chunk = new byte[1024];
- int read;
- try {
- while (continueStreaming) {
- read = fin.read(chunk);
- if (read > 0) {
- os.write(chunk, 0, read);
- } else {
- break;
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
- }
-
- } finally {
- socket.close();
- fin.close();
- }
-
- }
-
- private Socket waitForReceiver() throws Exception {
- Socket socket = null;
- while (socket == null) {
- try {
- socket = new Socket(LOCALHOST, port);
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
- + " seconds before reconnecting");
- }
- Thread.sleep(RECONNECT_PERIOD);
- }
- }
- return socket;
- }
-
- @Override
- public boolean stop() throws Exception {
- continueStreaming = false;
- return true;
- }
-
- @Override
- public boolean handleException(Throwable e) {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
deleted file mode 100644
index a1e90a8..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SocketClientAdapterFactory implements IAdapterFactory {
-
- private static final long serialVersionUID = 1L;
-
- private ARecordType outputType;
-
- private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
-
- private String[] fileSplits;
-
- public static final String KEY_FILE_SPLITS = "file_splits";
-
- @Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
- this.outputType = outputType;
- String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
- if (fileSplitsValue == null) {
- throw new IllegalArgumentException(
- "File splits not specified. File split is specified as a comma separated list of paths");
- }
- fileSplits = fileSplitsValue.trim().split(",");
- genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
- genericSocketAdapterFactory.configure(configuration, outputType);
- }
-
- @Override
- public String getAlias() {
- return ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER;
- }
-
- @Override
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return genericSocketAdapterFactory.getPartitionConstraint();
- }
-
- @Override
- public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
- return new SocketClientAdapter(socket.second, fileSplits[partition]);
- }
-
- @Override
- public ARecordType getAdapterOutputType() {
- return outputType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
deleted file mode 100644
index b5fd454..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.runtime.DataGenerator.InitializationInfo;
-import org.apache.asterix.external.runtime.DataGenerator.TweetMessage;
-import org.apache.asterix.external.runtime.DataGenerator.TweetMessageIterator;
-
-public class TweetGenerator {
-
- private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
-
- public static final String KEY_DURATION = "duration";
- public static final String KEY_TPS = "tps";
- public static final String KEY_VERBOSE = "verbose";
- public static final String KEY_FIELDS = "fields";
- public static final int INFINITY = 0;
-
- private static final int DEFAULT_DURATION = INFINITY;
-
- private int duration;
- private TweetMessageIterator tweetIterator = null;
- private int partition;
- private long tweetCount = 0;
- private int frameTweetCount = 0;
- private int numFlushedTweets = 0;
- private DataGenerator dataGenerator = null;
- private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
- private String[] fields;
- private final List<OutputStream> subscribers;
- private final Object lock = new Object();
- private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
-
- public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
- this.partition = partition;
- String value = configuration.get(KEY_DURATION);
- this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
- dataGenerator = new DataGenerator(new InitializationInfo());
- tweetIterator = dataGenerator.new TweetMessageIterator(duration);
- this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
- this.subscribers = new ArrayList<OutputStream>();
- }
-
- private void writeTweetString(TweetMessage tweetMessage) throws IOException {
- String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
- System.out.println(tweet);
- tweetCount++;
- byte[] b = tweet.getBytes();
- if (outputBuffer.position() + b.length > outputBuffer.limit()) {
- flush();
- numFlushedTweets += frameTweetCount;
- frameTweetCount = 0;
- outputBuffer.put(b);
- } else {
- outputBuffer.put(b);
- }
- frameTweetCount++;
- }
-
- private void flush() throws IOException {
- outputBuffer.flip();
- synchronized (lock) {
- for (OutputStream os : subscribers) {
- try {
- os.write(outputBuffer.array(), 0, outputBuffer.limit());
- } catch (Exception e) {
- subscribersForRemoval.add(os);
- }
- }
- if (!subscribersForRemoval.isEmpty()) {
- subscribers.removeAll(subscribersForRemoval);
- subscribersForRemoval.clear();
- }
- }
- outputBuffer.position(0);
- outputBuffer.limit(32 * 1024);
- }
-
- public boolean generateNextBatch(int numTweets) throws Exception {
- boolean moreData = tweetIterator.hasNext();
- if (!moreData) {
- if (outputBuffer.position() > 0) {
- flush();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
- }
- return false;
- } else {
- int count = 0;
- while (count < numTweets) {
- writeTweetString(tweetIterator.next());
- count++;
- }
- return true;
- }
- }
-
- public int getNumFlushedTweets() {
- return numFlushedTweets;
- }
-
- public void registerSubscriber(OutputStream os) {
- synchronized (lock) {
- subscribers.add(os);
- }
- }
-
- public void deregisterSubscribers(OutputStream os) {
- synchronized (lock) {
- subscribers.remove(os);
- }
- }
-
- public void close() throws IOException {
- synchronized (lock) {
- for (OutputStream os : subscribers) {
- os.close();
- }
- }
- }
-
- public boolean isSubscribed() {
- return !subscribers.isEmpty();
- }
-
- public long getTweetCount() {
- return tweetCount;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
deleted file mode 100644
index f8585bb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.external.api.INodeResolverFactory;
-
-/**
- * Factory for creating instance of {@link NodeResolver}
- */
-public class DNSResolverFactory implements INodeResolverFactory {
-
- private static final INodeResolver INSTANCE = new NodeResolver();
-
- @Override
- public INodeResolver createNodeResolver() {
- return INSTANCE;
- }
-
-}