You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:27:50 UTC
[17/53] [abbrv] git commit: adding backup job adding hdfs writer
config
adding backup job
adding hdfs writer config
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5d0f6bc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5d0f6bc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5d0f6bc4
Branch: refs/heads/master
Commit: 5d0f6bc422bb68c245c75a1c005568575324b06b
Parents: 9e757ae
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Mar 30 16:55:18 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Mar 30 16:55:18 2014 -0500
----------------------------------------------------------------------
.../apache/streams/hdfs/HdfsConfigurator.java | 12 ++++++++++
.../streams/hdfs/WebHdfsPersistWriter.java | 23 ++++++++++++--------
2 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index dfbc273..f9790e8 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -44,4 +44,16 @@ public class HdfsConfigurator {
return hdfsReaderConfiguration;
}
+ public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) {
+
+ HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
+ HdfsWriterConfiguration hdfsWriterConfiguration = mapper.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class);
+
+ String writerPath = hdfs.getString("writerPath");
+
+ hdfsWriterConfiguration.setWriterPath(writerPath);
+
+ return hdfsWriterConfiguration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 55c55b3..12f9a5e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -3,21 +3,16 @@ package org.apache.streams.hdfs;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
-import com.typesafe.config.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.streams.hdfs.HdfsConfiguration;
-
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
@@ -25,10 +20,12 @@ import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Queue;
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable
{
public final static String STREAMS_ID = "WebHdfsPersistWriter";
@@ -264,4 +261,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
e.printStackTrace();
}
}
+
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ DatumStatusCounter counters = new DatumStatusCounter();
+ counters.incrementAttempt(this.totalRecordsWritten);
+ counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
+ return counters;
+ }
}