You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/25 14:16:18 UTC
svn commit: r1161531 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/collector/ main/java/com/cloudera/flume/conf/
main/java/com/cloudera/flume/handlers/hdfs/
test/java/com/cloudera/flume/collector/
Author: jmhsieh
Date: Thu Aug 25 12:16:17 2011
New Revision: 1161531
URL: http://svn.apache.org/viewvc?rev=1161531&view=rev
Log:
FLUME-720: CollectorSink doesn't pass the new format parameter
Added:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java Thu Aug 25 12:16:17 2011
@@ -296,7 +296,6 @@ public class CollectorSink extends Event
String logdir = FlumeConfiguration.get().getCollectorDfsDir(); // default
long millis = FlumeConfiguration.get().getCollectorRollMillis();
String prefix = "";
- Object format = null;
if (argv.length >= 1) {
logdir = argv[0].toString();
}
@@ -307,6 +306,14 @@ public class CollectorSink extends Event
// TODO eventually Long instead of String
millis = Long.parseLong(argv[2].toString());
}
+
+ // try to get format from context
+ Object format = context.getObj("format", Object.class);
+ if (format == null) {
+ // used to be strings but now must be a func spec
+ String formatString = FlumeConfiguration.get().getDefaultOutputFormat();
+ format = new FunctionSpec(formatString);
+ }
if (argv.length >= 4) {
// shove format in to context to pass down.
format = argv[3];
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java Thu Aug 25 12:16:17 2011
@@ -390,7 +390,7 @@ public class FlumeBuilder {
String name;
Object[] args;
- FunctionSpec(String name, Object... args) {
+ public FunctionSpec(String name, Object... args) {
this.name = name;
this.args = args;
}
Added: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java?rev=1161531&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java (added)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java Thu Aug 25 12:16:17 2011
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package com.cloudera.flume.conf;
+
+import org.antlr.runtime.RecognitionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
+import com.cloudera.flume.handlers.text.FormatFactory;
+import com.cloudera.flume.handlers.text.output.OutputFormat;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class contains helpers for sink builders and source builders to use.
+ * This is where code such as context resolution can be handled.
+ */
+public class SinkBuilderUtil {
+ private static Logger LOG = LoggerFactory.getLogger(SinkBuilderUtil.class);
+
+ /**
+ * This converts and creates an instance of an OutputFormat. It expects either
+ * 1)a default from the flume-site.xml file, which can be overridden by 2) a
+ * FunctionSpec called "format" in the context, 3) an outputformat
+ * FunctionSpec argument, or 4) a string argument that it will convert to a
+ * function spec but warn but convert a deprecated string argument
+ *
+ * @param ctx
+ * @param arg
+ * @return
+ * @throws FlumeSpecException
+ */
+ public static OutputFormat resolveOutputFormat(Context ctx, Object arg)
+ throws FlumeSpecException {
+ // If an argument is specified, use it with highest precedence.
+ if (arg != null) {
+ // this will warn about deprecation if it a string instead of outputformat
+ // spec
+ return FlumeBuilder.createFormat(FormatFactory.get(), arg);
+ }
+
+ // Next try the context. This must be a output format FunctionSpec.
+ Object format = ctx.getObj("format", Object.class);
+ if (format != null) {
+ return FlumeBuilder.createFormat(FormatFactory.get(), format);
+ }
+
+ // Last attempt, load from xml config file, and parse it. Ideally the
+ // FlumeConfiguration settings would be in the context, but we don't support
+ // this yet.
+ String strFormat = FlumeConfiguration.get().getDefaultOutputFormat();
+ try {
+ format = FlumeBuilder.buildSimpleArg(FlumeBuilder.parseArg(strFormat));
+ } catch (RecognitionException e) {
+ throw new FlumeSpecException("Unable to parse output format: "
+ + strFormat);
+ }
+ return FlumeBuilder.createFormat(FormatFactory.get(), format);
+ }
+}
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java Thu Aug 25 12:16:17 2011
@@ -37,6 +37,7 @@ import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.SinkBuilderUtil;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
@@ -199,15 +200,22 @@ public class CustomDfsSink extends Event
"usage: customdfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\", format)");
}
- Object format = (args.length == 1) ? null : args[1];
- OutputFormat fmt;
+ Object formatArg = null;
+ if (args.length >= 2) {
+ formatArg = args[1];
+ }
+
+ OutputFormat o = null;
try {
- fmt = FlumeBuilder.createFormat(FormatFactory.get(), format);
+ o = SinkBuilderUtil.resolveOutputFormat(context, formatArg);
} catch (FlumeSpecException e) {
- LOG.error("failed to load format " + format, e);
- throw new IllegalArgumentException("failed to load format " + format);
+ LOG.warn("Illegal format type " + formatArg + ".", e);
+ throw new IllegalArgumentException("failed to load format", e);
}
- return new CustomDfsSink(args[0].toString(), fmt);
+ Preconditions.checkArgument(o != null, "Illegal format type "
+ + formatArg + ".");
+
+ return new CustomDfsSink(args[0].toString(), o);
}
@Deprecated
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java Thu Aug 25 12:16:17 2011
@@ -31,6 +31,7 @@ import com.cloudera.flume.conf.FlumeBuil
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.SinkBuilderUtil;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
@@ -164,20 +165,19 @@ public class EscapedCustomDfsSink extend
filename = args[1].toString();
}
- Object format = FlumeConfiguration.get().getDefaultOutputFormat();
+ Object formatArg = null;
if (args.length >= 3) {
- format = args[2];
+ formatArg = args[2];
}
- OutputFormat o;
+ OutputFormat o = null;
try {
- o = FlumeBuilder.createFormat(FormatFactory.get(), format);
+ o = SinkBuilderUtil.resolveOutputFormat(context, formatArg);
} catch (FlumeSpecException e) {
- LOG.warn("Illegal format type " + format + ".", e);
- o = null;
+ LOG.warn("Illegal format type " + formatArg + ".", e);
}
- Preconditions.checkArgument(o != null, "Illegal format type " + format
- + ".");
+ Preconditions.checkArgument(o != null, "Illegal format type "
+ + formatArg + ".");
return new EscapedCustomDfsSink(args[0].toString(), filename, o);
}
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java Thu Aug 25 12:16:17 2011
@@ -25,10 +25,9 @@ import static org.mockito.Mockito.doThro
import static org.mockito.Mockito.mock;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -47,7 +46,6 @@ import com.cloudera.flume.agent.durabili
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeArgException;
import com.cloudera.flume.conf.FlumeBuilder;
-import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.LogicalNodeContext;
import com.cloudera.flume.conf.ReportTestingContext;
@@ -66,16 +64,15 @@ import com.cloudera.flume.handlers.endto
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.flume.handlers.hdfs.CustomDfsSink;
import com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink;
+import com.cloudera.flume.handlers.hdfs.SeqfileEventSource;
import com.cloudera.flume.handlers.rolling.ProcessTagger;
import com.cloudera.flume.handlers.rolling.RollSink;
import com.cloudera.flume.handlers.rolling.Tagger;
-import com.cloudera.flume.handlers.thrift.Priority;
-import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.ReportManager;
-import com.cloudera.util.FlumeTestHarness;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
+import com.cloudera.util.FlumeTestHarness;
import com.cloudera.util.Pair;
/**
@@ -889,4 +886,35 @@ public class TestCollectorSink {
coll.close();
}
+ @Test(timeout=5000)
+ public void testCollectorOutputFormat() throws IOException,
+ FlumeSpecException, InterruptedException {
+ File f = FileUtil.mktempdir();
+ try {
+ final EventSink coll = FlumeBuilder.buildSink(
+ LogicalNodeContext.testingContext(), "collectorSink(" + "\"file://"
+ + f.getPath() + "\", \"prefix\", 5000, seqfile(\"bzip2\"))");
+ coll.open();
+ coll.append(new EventImpl("test".getBytes()));
+ coll.close();
+
+ // get the file generated by collector, and check to make sure it is a
+ // seqfile.
+ File f2 = f.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File arg) {
+ return arg.getName().startsWith("prefix");
+ }
+
+ })[0];
+ EventSource src = new SeqfileEventSource(f2.getAbsolutePath());
+ src.open();
+ Event e = src.next();
+ src.close();
+ assertEquals(new String(e.getBody()), "test");
+
+ } finally {
+ FileUtil.rmr(f);
+ }
+ }
}