You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/05 23:22:46 UTC

svn commit: r1574695 - in /incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs: WebHdfsPersistReader.java WebHdfsPersistReaderTask.java

Author: sblackmon
Date: Wed Mar  5 22:22:46 2014
New Revision: 1574695

URL: http://svn.apache.org/r1574695
Log:
missing two files

Added:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java?rev=1574695&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java Wed Mar  5 22:22:46 2014
@@ -0,0 +1,196 @@
+package org.apache.streams.hdfs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+/**
+ * Created by sblackmon on 2/28/14.
+ */
+public class WebHdfsPersistReader implements StreamsPersistReader {
+
+    public final static String STREAMS_ID = "WebHdfsPersistReader";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class);
+
+    protected final static char DELIMITER = '\t';
+
+    protected FileSystem client;
+    protected Path path;
+    protected FileStatus[] status;
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private HdfsReaderConfiguration hdfsConfiguration;
+
+    private ExecutorService executor;
+
+    public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
+        this.hdfsConfiguration = hdfsConfiguration;
+    }
+
+    public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
+    public boolean isConnected() 		                { return (client != null); }
+
+    public final synchronized FileSystem getFileSystem()
+    {
+        // Check to see if we are connected.
+        if(!isConnected())
+            connectToWebHDFS();
+        return this.client;
+    }
+
+    private synchronized void connectToWebHDFS()
+    {
+        try
+        {
+            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
+            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
+
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                public Void run() throws Exception {
+                    Configuration conf = new Configuration();
+                    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+                    LOGGER.info("WebURI : {}", getURI().toString());
+                    client = FileSystem.get(getURI(), conf);
+                    LOGGER.info("Connected to WebHDFS");
+
+                    /*
+                    * ************************************************************************************************
+                    * This code is an example of how you would work with HDFS and you weren't going over
+                    * the webHDFS protocol.
+                    *
+                    * Smashew: 2013-10-01
+                    * ************************************************************************************************
+                    conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
+                    conf.set("namenode.host","0.0.0.0");
+                    conf.set("hadoop.job.ugi", userName);
+                    conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
+                    fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
+                    FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
+                    for(int i=0;i<status.length;i++)
+                    {
+                        LOGGER.info("Directory: {}", status[i].getPath());
+                    }
+                    */
+                    return null;
+                }
+            });
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        LOGGER.debug("Prepare");
+        connectToWebHDFS();
+        path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath());
+        try {
+            status = client.listStatus(path);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        readSourceWritePersistQueue();
+        return new StreamsResultSet(persistQueue);
+    }
+
+    @Override
+    public void startStream() {
+        LOGGER.debug("startStream");
+        executor.submit(new WebHdfsPersistReaderTask(this));
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        LOGGER.debug("readCurrent: {}", persistQueue.size());
+
+        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+        Iterators.addAll(currentIterator, persistQueue.iterator());
+
+        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+
+        persistQueue.clear();
+
+        return current;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    private void readSourceWritePersistQueue() {
+        for( FileStatus fileStatus : status ) {
+            BufferedReader reader;
+
+            if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) {
+                try {
+                    reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath())));
+
+                    String line;
+                    do{
+                        try {
+                            line = reader.readLine();
+                            if( line != null ) {
+                                String[] fields = line.split(Character.toString(DELIMITER));
+                                persistQueue.offer(new StreamsDatum(fields[3]));
+                            }
+                        } catch (IOException e) {
+                            break;
+                        }
+                    } while( line != null );
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    break;
+                }
+            }
+        }
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java?rev=1574695&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java Wed Mar  5 22:22:46 2014
@@ -0,0 +1,54 @@
+package org.apache.streams.hdfs;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+public class WebHdfsPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
+
+    private WebHdfsPersistReader reader;
+
+    public WebHdfsPersistReaderTask(WebHdfsPersistReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+
+        for( FileStatus fileStatus : reader.status ) {
+            BufferedReader bufferedReader;
+
+            if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) {
+                try {
+                    bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
+
+                    String line = "";
+                    do{
+                        try {
+                            line = bufferedReader.readLine();
+                            if( line != null ) {
+                                String[] fields = line.split(Character.toString(reader.DELIMITER));
+                                reader.persistQueue.offer(new StreamsDatum(fields[3]));
+                            }
+                        } catch (Exception e) {
+                            LOGGER.warn("Failed processing " + line);
+                        }
+                    } while( line != null );
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    break;
+                }
+            }
+        }
+
+    }
+
+}