You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 20:50:08 UTC

svn commit: r1161690 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java

Author: esammer
Date: Thu Aug 25 18:50:08 2011
New Revision: 1161690

URL: http://svn.apache.org/viewvc?rev=1161690&view=rev
Log:
- Used NetcatSource as a guinea pig for WAL support. Works, but unsure if this is clean enough.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1161690&r1=1161689&r2=1161690&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Aug 25 18:50:08 2011
@@ -13,26 +13,48 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.durability.WALManager;
+import org.apache.flume.durability.WALWriter;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetcatSource extends AbstractEventSource {
+import com.google.common.base.Preconditions;
+
+public class NetcatSource extends AbstractEventSource implements Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(NetcatSource.class);
 
   private int port;
+  private String nodeName;
   private ServerSocketChannel serverSocket;
   private CounterGroup counterGroup;
 
+  private WALManager walManager;
+  private WALWriter walWriter;
+
   public NetcatSource() {
     port = 0;
     counterGroup = new CounterGroup();
   }
 
   @Override
-  public void open(Context context) {
+  public void configure(Context context) {
+    String nodeName = context.get("logicalNode.name", String.class);
+    Integer port = context.get("source.port", Integer.class);
+
+    Preconditions.checkArgument(nodeName != null, "Node name may not be null");
+    Preconditions.checkArgument(port != null, "Source port may not be null");
+
+    this.nodeName = nodeName;
+    this.port = port;
+  }
+
+  @Override
+  public void open(Context context) throws LifecycleException {
     counterGroup.incrementAndGet("open.attempts");
 
     try {
@@ -47,6 +69,17 @@ public class NetcatSource extends Abstra
       counterGroup.incrementAndGet("open.errors");
       logger.error("Unable to bind to socket. Exception follows.", e);
     }
+
+    if (walManager != null) {
+      logger.debug("Event durability features enabled. Using WALManager:{}",
+          walManager);
+      try {
+        walWriter = walManager.getWAL(nodeName).getWriter();
+      } catch (IOException e) {
+        throw new LifecycleException(
+            "Unable to get WAL writer. Exception follows.", e);
+      }
+    }
   }
 
   @Override
@@ -80,6 +113,11 @@ public class NetcatSource extends Abstra
 
       event = EventBuilder.withBody(builder.toString().getBytes());
 
+      if (walWriter != null) {
+        walWriter.write(event);
+        walWriter.flush();
+      }
+
       channel.close();
 
       counterGroup.incrementAndGet("events.success");
@@ -94,7 +132,7 @@ public class NetcatSource extends Abstra
   }
 
   @Override
-  public void close(Context context) {
+  public void close(Context context) throws LifecycleException {
     if (serverSocket != null) {
       try {
         serverSocket.close();
@@ -102,6 +140,17 @@ public class NetcatSource extends Abstra
         logger.error("Unable to close socket. Exception follows.", e);
       }
     }
+
+    if (walWriter != null) {
+      try {
+        walWriter.flush();
+        walWriter.close();
+      } catch (IOException e) {
+        throw new LifecycleException(
+            "Unable to flush WAL on close - POTENTIAL DATA LOSS! Exception follows.",
+            e);
+      }
+    }
   }
 
   public int getPort() {
@@ -112,4 +161,12 @@ public class NetcatSource extends Abstra
     this.port = port;
   }
 
+  public WALManager getWALManager() {
+    return walManager;
+  }
+
+  public void setWALManager(WALManager walManager) {
+    this.walManager = walManager;
+  }
+
 }