You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 20:50:08 UTC
svn commit: r1161690 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
Author: esammer
Date: Thu Aug 25 18:50:08 2011
New Revision: 1161690
URL: http://svn.apache.org/viewvc?rev=1161690&view=rev
Log:
- Used NetcatSource as a guinea pig for WAL support. Works, but unsure if this is clean enough.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1161690&r1=1161689&r2=1161690&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Aug 25 18:50:08 2011
@@ -13,26 +13,48 @@ import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.durability.WALManager;
+import org.apache.flume.durability.WALWriter;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetcatSource extends AbstractEventSource {
+import com.google.common.base.Preconditions;
+
+public class NetcatSource extends AbstractEventSource implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(NetcatSource.class);
private int port;
+ private String nodeName;
private ServerSocketChannel serverSocket;
private CounterGroup counterGroup;
+ private WALManager walManager;
+ private WALWriter walWriter;
+
public NetcatSource() {
port = 0;
counterGroup = new CounterGroup();
}
@Override
- public void open(Context context) {
+ public void configure(Context context) {
+ String nodeName = context.get("logicalNode.name", String.class);
+ Integer port = context.get("source.port", Integer.class);
+
+ Preconditions.checkArgument(nodeName != null, "Node name may not be null");
+ Preconditions.checkArgument(port != null, "Source port may not be null");
+
+ this.nodeName = nodeName;
+ this.port = port;
+ }
+
+ @Override
+ public void open(Context context) throws LifecycleException {
counterGroup.incrementAndGet("open.attempts");
try {
@@ -47,6 +69,17 @@ public class NetcatSource extends Abstra
counterGroup.incrementAndGet("open.errors");
logger.error("Unable to bind to socket. Exception follows.", e);
}
+
+ if (walManager != null) {
+ logger.debug("Event durability features enabled. Using WALManager:{}",
+ walManager);
+ try {
+ walWriter = walManager.getWAL(nodeName).getWriter();
+ } catch (IOException e) {
+ throw new LifecycleException(
+ "Unable to get WAL writer. Exception follows.", e);
+ }
+ }
}
@Override
@@ -80,6 +113,11 @@ public class NetcatSource extends Abstra
event = EventBuilder.withBody(builder.toString().getBytes());
+ if (walWriter != null) {
+ walWriter.write(event);
+ walWriter.flush();
+ }
+
channel.close();
counterGroup.incrementAndGet("events.success");
@@ -94,7 +132,7 @@ public class NetcatSource extends Abstra
}
@Override
- public void close(Context context) {
+ public void close(Context context) throws LifecycleException {
if (serverSocket != null) {
try {
serverSocket.close();
@@ -102,6 +140,17 @@ public class NetcatSource extends Abstra
logger.error("Unable to close socket. Exception follows.", e);
}
}
+
+ if (walWriter != null) {
+ try {
+ walWriter.flush();
+ walWriter.close();
+ } catch (IOException e) {
+ throw new LifecycleException(
+ "Unable to flush WAL on close - POTENTIAL DATA LOSS! Exception follows.",
+ e);
+ }
+ }
}
public int getPort() {
@@ -112,4 +161,12 @@ public class NetcatSource extends Abstra
this.port = port;
}
+ public WALManager getWALManager() {
+ return walManager;
+ }
+
+ public void setWALManager(WALManager walManager) {
+ this.walManager = walManager;
+ }
+
}