You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:27:50 UTC

[17/53] [abbrv] git commit: adding backup job adding hdfs writer config

adding backup job
adding hdfs writer config


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5d0f6bc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5d0f6bc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5d0f6bc4

Branch: refs/heads/master
Commit: 5d0f6bc422bb68c245c75a1c005568575324b06b
Parents: 9e757ae
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Mar 30 16:55:18 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Mar 30 16:55:18 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/hdfs/HdfsConfigurator.java   | 12 ++++++++++
 .../streams/hdfs/WebHdfsPersistWriter.java      | 23 ++++++++++++--------
 2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index dfbc273..f9790e8 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -44,4 +44,16 @@ public class HdfsConfigurator {
         return hdfsReaderConfiguration;
     }
 
+    public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) {
+
+        HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
+        HdfsWriterConfiguration hdfsWriterConfiguration  = mapper.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class);
+
+        String writerPath = hdfs.getString("writerPath");
+
+        hdfsWriterConfiguration.setWriterPath(writerPath);
+
+        return hdfsWriterConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 55c55b3..12f9a5e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -3,21 +3,16 @@ package org.apache.streams.hdfs;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
-import com.typesafe.config.Config;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.streams.hdfs.HdfsConfiguration;
-
 import java.io.Closeable;
 import java.io.Flushable;
 import java.io.IOException;
@@ -25,10 +20,12 @@ import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Queue;
 
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable
 {
     public final static String STREAMS_ID = "WebHdfsPersistWriter";
 
@@ -264,4 +261,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
             e.printStackTrace();
         }
     }
+
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        DatumStatusCounter counters = new DatumStatusCounter();
+        counters.incrementAttempt(this.totalRecordsWritten);
+        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
+        return counters;
+    }
 }