You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/29 18:55:34 UTC

svn commit: r1162900 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume: conf/Configurables.java source/NetcatSource.java source/SequenceGeneratorSource.java

Author: esammer
Date: Mon Aug 29 16:55:34 2011
New Revision: 1162900

URL: http://svn.apache.org/viewvc?rev=1162900&view=rev
Log:
- Added static methods for checking optional and required context parameters and
  updated sanity checks in configure() methods of some sources.
- Added WAL support to SequenceGeneratorSource.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java Mon Aug 29 16:55:34 2011
@@ -26,4 +26,26 @@ public class Configurables {
     return false;
   }
 
+  public static void ensureRequiredNonNull(Context context, String... keys) {
+    for (String key : keys) {
+      if (!context.getParameters().containsKey(key)
+          || context.getParameters().get(key) == null) {
+
+        throw new IllegalArgumentException("Required parameter " + key
+            + " must exist and may not be null");
+      }
+    }
+  }
+
+  public static void ensureOptionalNonNull(Context context, String... keys) {
+    for (String key : keys) {
+      if (context.getParameters().containsKey(key)
+          && context.getParameters().get(key) == null) {
+
+        throw new IllegalArgumentException("Optional parameter " + key
+            + " may not be null");
+      }
+    }
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Mon Aug 29 16:55:34 2011
@@ -14,6 +14,7 @@ import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.durability.WALManager;
 import org.apache.flume.durability.WALWriter;
 import org.apache.flume.event.EventBuilder;
@@ -21,8 +22,6 @@ import org.apache.flume.lifecycle.Lifecy
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class NetcatSource extends AbstractEventSource implements Configurable {
 
   private static final Logger logger = LoggerFactory
@@ -43,14 +42,11 @@ public class NetcatSource extends Abstra
 
   @Override
   public void configure(Context context) {
-    String nodeName = context.get("logicalNode.name", String.class);
-    String port = context.get("source.port", String.class);
-
-    Preconditions.checkArgument(nodeName != null, "Node name may not be null");
-    Preconditions.checkArgument(port != null, "Source port may not be null");
+    Configurables.ensureRequiredNonNull(context, "logicalNode.name",
+        "source.port");
 
-    this.nodeName = nodeName;
-    this.port = Integer.parseInt(port);
+    nodeName = context.get("logicalNode.name", String.class);
+    port = Integer.parseInt(context.get("source.port", String.class));
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Mon Aug 29 16:55:34 2011
@@ -1,13 +1,49 @@
 package org.apache.flume.source;
 
+import java.io.IOException;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.durability.WAL;
+import org.apache.flume.durability.WALManager;
+import org.apache.flume.durability.WALWriter;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+
+public class SequenceGeneratorSource extends AbstractEventSource implements
+    Configurable {
 
-public class SequenceGeneratorSource extends AbstractEventSource {
+  private String nodeName;
 
   private long sequence;
+  private WALManager walManager;
+  private WALWriter walWriter;
+
+  @Override
+  public void open(Context context) throws LifecycleException {
+    if (walManager != null) {
+      WAL wal = walManager.getWAL(nodeName);
+      try {
+        walWriter = wal.getWriter();
+      } catch (IOException e) {
+        throw new LifecycleException(e);
+      }
+    }
+  }
+
+  @Override
+  public void close(Context context) throws LifecycleException {
+    if (walWriter != null) {
+      try {
+        walWriter.close();
+      } catch (IOException e) {
+        throw new LifecycleException(e);
+      }
+    }
+  }
 
   @Override
   public Event next(Context context) throws InterruptedException,
@@ -17,7 +53,32 @@ public class SequenceGeneratorSource ext
 
     event.setBody(Long.valueOf(sequence++).toString().getBytes());
 
+    if (walWriter != null) {
+      try {
+        walWriter.write(event);
+        walWriter.flush();
+      } catch (IOException e) {
+        throw new EventDeliveryException("Unable to write event to WAL via "
+            + walWriter + ". Exception follows.", e);
+      }
+    }
+
     return event;
   }
 
+  @Override
+  public void configure(Context context) {
+    Configurables.ensureRequiredNonNull(context, "logicalNode.name");
+
+    nodeName = context.get("logicalNode.name", String.class);
+  }
+
+  public WALManager getWALManager() {
+    return walManager;
+  }
+
+  public void setWALManager(WALManager walManager) {
+    this.walManager = walManager;
+  }
+
 }