You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/10/01 00:55:54 UTC
svn commit: r1177872 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/conf/
main/java/com/cloudera/flume/handlers/hdfs/
test/java/com/cloudera/flume/handlers/hdfs/
test/java/com/cloudera/flume/handlers/rolling/
Author: jmhsieh
Date: Fri Sep 30 22:55:54 2011
New Revision: 1177872
URL: http://svn.apache.org/viewvc?rev=1177872&view=rev
Log:
FLUME-734: escapedFormatDfs goes into a file creation frenzy
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Fri Sep 30 22:55:54 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
@@ -948,6 +949,21 @@ public class FlumeConfiguration extends
return get(COLLECTOR_OUTPUT_FORMAT, "avrojson");
}
+ public FunctionSpec getDefaultOutputFormatSpec() {
+ // Look at FormatFactory for possible values.
+ String defaultFormat = "avrojson";
+ String fmt = get(COLLECTOR_OUTPUT_FORMAT, defaultFormat);
+ Object fmtSpec = null;
+ try {
+ fmtSpec = FlumeBuilder.buildSimpleArg(FlumeBuilder.parseArg(fmt));
+ } catch (Exception e) {
+ LOG.warn("Problem parsing output format '" + fmt + "'; defaulting to "
+ + defaultFormat);
+ fmtSpec = new FunctionSpec(defaultFormat);
+ }
+ return (FunctionSpec)fmtSpec;
+ }
+
public String getGangliaServers() {
// gmond's default multicast ip and port
return get(GANGLIA_SERVERS, "239.2.11.71:8649");
@@ -1013,8 +1029,8 @@ public class FlumeConfiguration extends
if (home == null) {
home = ".";
}
- return home + File.separator + get(WEBAPP_ROOT_MASTER,
- "webapps/flumemaster.war");
+ return home + File.separator
+ + get(WEBAPP_ROOT_MASTER, "webapps/flumemaster.war");
}
/**
@@ -1025,8 +1041,8 @@ public class FlumeConfiguration extends
if (home == null) {
home = ".";
}
- return home + File.separator + get(WEBAPP_ROOT_NODE,
- "webapps/flumeagent.war");
+ return home + File.separator
+ + get(WEBAPP_ROOT_NODE, "webapps/flumeagent.war");
}
/**
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java Fri Sep 30 22:55:54 2011
@@ -73,4 +73,8 @@ public class SinkBuilderUtil {
}
return FlumeBuilder.createFormat(FormatFactory.get(), format);
}
+
+ public static FunctionSpec getDefaultOutputFormatSpec() {
+ return FlumeConfiguration.get().getDefaultOutputFormatSpec();
+ }
}
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java Fri Sep 30 22:55:54 2011
@@ -55,7 +55,8 @@ import com.google.common.base.Preconditi
public class EscapedCustomDfsSink extends EventSink.Base {
static final Logger LOG = LoggerFactory.getLogger(EscapedCustomDfsSink.class);
final String path;
- OutputFormat format;
+ FunctionSpec formatSpec;
+ Context ctx;
CustomDfsSink writer = null;
@@ -71,11 +72,13 @@ public class EscapedCustomDfsSink extend
private String filename = "";
protected String absolutePath = "";
- public EscapedCustomDfsSink(String path, String filename, OutputFormat o) {
+ public EscapedCustomDfsSink(Context ctx, String path, String filename,
+ FunctionSpec fs) {
+ this.ctx = ctx;
this.path = path;
this.filename = filename;
shouldSub = Event.containsTag(path) || Event.containsTag(filename);
- this.format = o;
+ this.formatSpec = fs;
absolutePath = path;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
@@ -95,12 +98,21 @@ public class EscapedCustomDfsSink extend
}
}
- public EscapedCustomDfsSink(String path, String filename) {
- this(path, filename, getDefaultOutputFormat());
+ public EscapedCustomDfsSink(Context ctx, String path, String filename) {
+ this(ctx, path, filename, SinkBuilderUtil.getDefaultOutputFormatSpec());
}
protected CustomDfsSink openWriter(String p) throws IOException {
LOG.info("Opening " + p);
+ // We need to instantiate a new outputFormat for each CustomDfsSink.
+ OutputFormat format;
+ try {
+ format = SinkBuilderUtil.resolveOutputFormat(ctx, formatSpec);
+ } catch (FlumeSpecException e) {
+ format = getDefaultOutputFormat();
+ LOG.warn("Had problem creating format " + formatSpec
+ + "; reverting to default:" + format);
+ }
CustomDfsSink w = new CustomDfsSink(p, format);
w.open();
return w;
@@ -179,7 +191,14 @@ public class EscapedCustomDfsSink extend
Preconditions.checkArgument(o != null, "Illegal format type "
+ formatArg + ".");
- return new EscapedCustomDfsSink(args[0].toString(), filename, o);
+ // handle legacy string format arguments
+ // TODO only support FunctionSpec in the future
+ if (formatArg instanceof String) {
+ formatArg = new FunctionSpec((String) formatArg);
+ }
+ FunctionSpec formatFunc = (FunctionSpec) formatArg;
+ return new EscapedCustomDfsSink(context, args[0].toString(), filename,
+ formatFunc);
}
@Deprecated
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java Fri Sep 30 22:55:54 2011
@@ -204,4 +204,33 @@ public class TestEscapedCustomOutputDfs
FlumeBuilder.buildSink(new Context(), src);
}
+ /**
+ * Some output formats cache an output stream and each hdfs file thus needs to
+ * make sure it has its own copy of the outputStream.
+ *
+ * @throws IOException
+ * @throws FlumeSpecException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testNoOutputFormatSharingProblem() throws IOException,
+ FlumeSpecException, InterruptedException {
+ File f = FileUtil.mktempdir("newFileOutputFormatPer");
+ String snk = "escapedFormatDfs(\"file://" + f.getAbsoluteFile()
+ + "\", \"%{nanos}\", seqfile)";
+
+ Event e1 = new EventImpl("e1".getBytes());
+ Event e2 = new EventImpl("e2".getBytes());
+
+ EventSink evtSnk = FlumeBuilder.buildSink(new Context(), snk);
+
+ try {
+ evtSnk.open();
+ evtSnk.append(e1);
+ evtSnk.append(e2);
+ evtSnk.close();
+ } finally {
+ FileUtil.rmr(f);
+ }
+ }
}
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java Fri Sep 30 22:55:54 2011
@@ -90,7 +90,7 @@ public class TestRollSink {
10000), 250) {
@Override
protected EventSink newSink(Context ctx) throws IOException {
- return new EscapedCustomDfsSink("file:///" + f.getPath(),
+ return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
"sub-%{service}%{rolltag}");
}
};
@@ -417,7 +417,7 @@ public class TestRollSink {
10000), 250) {
@Override
protected EventSink newSink(Context ctx) throws IOException {
- return new EscapedCustomDfsSink("file:///" + f.getPath(),
+ return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
"sub-%{service}%{rolltag}");
}
};