You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/29 18:55:34 UTC
svn commit: r1162900 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume:
conf/Configurables.java source/NetcatSource.java
source/SequenceGeneratorSource.java
Author: esammer
Date: Mon Aug 29 16:55:34 2011
New Revision: 1162900
URL: http://svn.apache.org/viewvc?rev=1162900&view=rev
Log:
- Added static methods for checking optional and required context parameters and
updated sanity checks in configure() methods of some sources.
- Added WAL support to SequenceGeneratorSource.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java Mon Aug 29 16:55:34 2011
@@ -26,4 +26,26 @@ public class Configurables {
return false;
}
+ public static void ensureRequiredNonNull(Context context, String... keys) {
+ for (String key : keys) {
+ if (!context.getParameters().containsKey(key)
+ || context.getParameters().get(key) == null) {
+
+ throw new IllegalArgumentException("Required parameter " + key
+ + " must exist and may not be null");
+ }
+ }
+ }
+
+ public static void ensureOptionalNonNull(Context context, String... keys) {
+ for (String key : keys) {
+ if (context.getParameters().containsKey(key)
+ && context.getParameters().get(key) == null) {
+
+ throw new IllegalArgumentException("Optional parameter " + key
+ + " may not be null");
+ }
+ }
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Mon Aug 29 16:55:34 2011
@@ -14,6 +14,7 @@ import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.durability.WALManager;
import org.apache.flume.durability.WALWriter;
import org.apache.flume.event.EventBuilder;
@@ -21,8 +22,6 @@ import org.apache.flume.lifecycle.Lifecy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
public class NetcatSource extends AbstractEventSource implements Configurable {
private static final Logger logger = LoggerFactory
@@ -43,14 +42,11 @@ public class NetcatSource extends Abstra
@Override
public void configure(Context context) {
- String nodeName = context.get("logicalNode.name", String.class);
- String port = context.get("source.port", String.class);
-
- Preconditions.checkArgument(nodeName != null, "Node name may not be null");
- Preconditions.checkArgument(port != null, "Source port may not be null");
+ Configurables.ensureRequiredNonNull(context, "logicalNode.name",
+ "source.port");
- this.nodeName = nodeName;
- this.port = Integer.parseInt(port);
+ nodeName = context.get("logicalNode.name", String.class);
+ port = Integer.parseInt(context.get("source.port", String.class));
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1162900&r1=1162899&r2=1162900&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Mon Aug 29 16:55:34 2011
@@ -1,13 +1,49 @@
package org.apache.flume.source;
+import java.io.IOException;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.durability.WAL;
+import org.apache.flume.durability.WALManager;
+import org.apache.flume.durability.WALWriter;
import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+
+public class SequenceGeneratorSource extends AbstractEventSource implements
+ Configurable {
-public class SequenceGeneratorSource extends AbstractEventSource {
+ private String nodeName;
private long sequence;
+ private WALManager walManager;
+ private WALWriter walWriter;
+
+ @Override
+ public void open(Context context) throws LifecycleException {
+ if (walManager != null) {
+ WAL wal = walManager.getWAL(nodeName);
+ try {
+ walWriter = wal.getWriter();
+ } catch (IOException e) {
+ throw new LifecycleException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close(Context context) throws LifecycleException {
+ if (walWriter != null) {
+ try {
+ walWriter.close();
+ } catch (IOException e) {
+ throw new LifecycleException(e);
+ }
+ }
+ }
@Override
public Event next(Context context) throws InterruptedException,
@@ -17,7 +53,32 @@ public class SequenceGeneratorSource ext
event.setBody(Long.valueOf(sequence++).toString().getBytes());
+ if (walWriter != null) {
+ try {
+ walWriter.write(event);
+ walWriter.flush();
+ } catch (IOException e) {
+ throw new EventDeliveryException("Unable to write event to WAL via "
+ + walWriter + ". Exception follows.", e);
+ }
+ }
+
return event;
}
+ @Override
+ public void configure(Context context) {
+ Configurables.ensureRequiredNonNull(context, "logicalNode.name");
+
+ nodeName = context.get("logicalNode.name", String.class);
+ }
+
+ public WALManager getWALManager() {
+ return walManager;
+ }
+
+ public void setWALManager(WALManager walManager) {
+ this.walManager = walManager;
+ }
+
}