You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:02 UTC

[08/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
deleted file mode 100644
index db38c12..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-public class SocketClientAdapter implements IDataSourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName());
-
-    private static final String LOCALHOST = "127.0.0.1";
-
-    private static final long RECONNECT_PERIOD = 2000;
-
-    private final String localFile;
-
-    private final int port;
-
-    private boolean continueStreaming = true;
-
-    public SocketClientAdapter(Integer port, String localFile) {
-        this.localFile = localFile;
-        this.port = port;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        Socket socket = waitForReceiver();
-        OutputStream os = socket.getOutputStream();
-        FileInputStream fin = new FileInputStream(new File(localFile));
-        byte[] chunk = new byte[1024];
-        int read;
-        try {
-            while (continueStreaming) {
-                read = fin.read(chunk);
-                if (read > 0) {
-                    os.write(chunk, 0, read);
-                } else {
-                    break;
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
-            }
-
-        } finally {
-            socket.close();
-            fin.close();
-        }
-
-    }
-
-    private Socket waitForReceiver() throws Exception {
-        Socket socket = null;
-        while (socket == null) {
-            try {
-                socket = new Socket(LOCALHOST, port);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
-                            + " seconds before reconnecting");
-                }
-                Thread.sleep(RECONNECT_PERIOD);
-            }
-        }
-        return socket;
-    }
-
-    @Override
-    public boolean stop() throws Exception {
-        continueStreaming = false;
-        return true;
-    }
-
-    @Override
-    public boolean handleException(Throwable e) {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
deleted file mode 100644
index a1e90a8..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SocketClientAdapterFactory implements IAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private ARecordType outputType;
-
-    private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
-
-    private String[] fileSplits;
-
-    public static final String KEY_FILE_SPLITS = "file_splits";
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.outputType = outputType;
-        String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
-        if (fileSplitsValue == null) {
-            throw new IllegalArgumentException(
-                    "File splits not specified. File split is specified as a comma separated list of paths");
-        }
-        fileSplits = fileSplitsValue.trim().split(",");
-        genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
-        genericSocketAdapterFactory.configure(configuration, outputType);
-    }
-
-    @Override
-    public String getAlias() {
-        return ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return genericSocketAdapterFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
-        return new SocketClientAdapter(socket.second, fileSplits[partition]);
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
deleted file mode 100644
index b5fd454..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.runtime;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.runtime.DataGenerator.InitializationInfo;
-import org.apache.asterix.external.runtime.DataGenerator.TweetMessage;
-import org.apache.asterix.external.runtime.DataGenerator.TweetMessageIterator;
-
-public class TweetGenerator {
-
-    private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
-
-    public static final String KEY_DURATION = "duration";
-    public static final String KEY_TPS = "tps";
-    public static final String KEY_VERBOSE = "verbose";
-    public static final String KEY_FIELDS = "fields";
-    public static final int INFINITY = 0;
-
-    private static final int DEFAULT_DURATION = INFINITY;
-
-    private int duration;
-    private TweetMessageIterator tweetIterator = null;
-    private int partition;
-    private long tweetCount = 0;
-    private int frameTweetCount = 0;
-    private int numFlushedTweets = 0;
-    private DataGenerator dataGenerator = null;
-    private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
-    private String[] fields;
-    private final List<OutputStream> subscribers;
-    private final Object lock = new Object();
-    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
-
-    public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
-        this.partition = partition;
-        String value = configuration.get(KEY_DURATION);
-        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
-        dataGenerator = new DataGenerator(new InitializationInfo());
-        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
-        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
-        this.subscribers = new ArrayList<OutputStream>();
-    }
-
-    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
-        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
-        System.out.println(tweet);
-        tweetCount++;
-        byte[] b = tweet.getBytes();
-        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
-            flush();
-            numFlushedTweets += frameTweetCount;
-            frameTweetCount = 0;
-            outputBuffer.put(b);
-        } else {
-            outputBuffer.put(b);
-        }
-        frameTweetCount++;
-    }
-
-    private void flush() throws IOException {
-        outputBuffer.flip();
-        synchronized (lock) {
-            for (OutputStream os : subscribers) {
-                try {
-                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
-                } catch (Exception e) {
-                    subscribersForRemoval.add(os);
-                }
-            }
-            if (!subscribersForRemoval.isEmpty()) {
-                subscribers.removeAll(subscribersForRemoval);
-                subscribersForRemoval.clear();
-            }
-        }
-        outputBuffer.position(0);
-        outputBuffer.limit(32 * 1024);
-    }
-
-    public boolean generateNextBatch(int numTweets) throws Exception {
-        boolean moreData = tweetIterator.hasNext();
-        if (!moreData) {
-            if (outputBuffer.position() > 0) {
-                flush();
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
-            }
-            return false;
-        } else {
-            int count = 0;
-            while (count < numTweets) {
-                writeTweetString(tweetIterator.next());
-                count++;
-            }
-            return true;
-        }
-    }
-
-    public int getNumFlushedTweets() {
-        return numFlushedTweets;
-    }
-
-    public void registerSubscriber(OutputStream os) {
-        synchronized (lock) {
-            subscribers.add(os);
-        }
-    }
-
-    public void deregisterSubscribers(OutputStream os) {
-        synchronized (lock) {
-            subscribers.remove(os);
-        }
-    }
-
-    public void close() throws IOException {
-        synchronized (lock) {
-            for (OutputStream os : subscribers) {
-                os.close();
-            }
-        }
-    }
-
-    public boolean isSubscribed() {
-        return !subscribers.isEmpty();
-    }
-
-    public long getTweetCount() {
-        return tweetCount;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
deleted file mode 100644
index f8585bb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.util;
-
-import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.external.api.INodeResolverFactory;
-
-/**
- * Factory for creating instance of {@link NodeResolver}
- */
-public class DNSResolverFactory implements INodeResolverFactory {
-
-    private static final INodeResolver INSTANCE = new NodeResolver();
-
-    @Override
-    public INodeResolver createNodeResolver() {
-        return INSTANCE;
-    }
-
-}