You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/25 14:16:18 UTC

svn commit: r1161531 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/collector/ main/java/com/cloudera/flume/conf/ main/java/com/cloudera/flume/handlers/hdfs/ test/java/com/cloudera/flume/collector/

Author: jmhsieh
Date: Thu Aug 25 12:16:17 2011
New Revision: 1161531

URL: http://svn.apache.org/viewvc?rev=1161531&view=rev
Log:
FLUME-720: CollectorSink doesn't pass the new format parameter

Added:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java Thu Aug 25 12:16:17 2011
@@ -296,7 +296,6 @@ public class CollectorSink extends Event
         String logdir = FlumeConfiguration.get().getCollectorDfsDir(); // default
         long millis = FlumeConfiguration.get().getCollectorRollMillis();
         String prefix = "";
-        Object format = null;
         if (argv.length >= 1) {
           logdir = argv[0].toString();
         }
@@ -307,6 +306,14 @@ public class CollectorSink extends Event
           // TODO eventually Long instead of String
           millis = Long.parseLong(argv[2].toString());
         }
+
+        // try to get format from context
+        Object format = context.getObj("format", Object.class);
+        if (format == null) {
+          // used to be strings but now must be a func spec
+          String formatString = FlumeConfiguration.get().getDefaultOutputFormat();
+          format = new FunctionSpec(formatString);
+        }
         if (argv.length >= 4) {
           // shove format in to context to pass down.
           format = argv[3];

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java Thu Aug 25 12:16:17 2011
@@ -390,7 +390,7 @@ public class FlumeBuilder {
     String name;
     Object[] args;
 
-    FunctionSpec(String name, Object... args) {
+    public FunctionSpec(String name, Object... args) {
       this.name = name;
       this.args = args;
     }

Added: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java?rev=1161531&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java (added)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java Thu Aug 25 12:16:17 2011
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package com.cloudera.flume.conf;
+
+import org.antlr.runtime.RecognitionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
+import com.cloudera.flume.handlers.text.FormatFactory;
+import com.cloudera.flume.handlers.text.output.OutputFormat;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class contains helpers for sink builders and source builders to use.
+ * This is where code such as context resolution can be handled. 
+ */
+public class SinkBuilderUtil {
+  private static Logger LOG = LoggerFactory.getLogger(SinkBuilderUtil.class);
+
+  /**
+   * This converts and creates an instance of an OutputFormat. It expects either
+   * 1)a default from the flume-site.xml file, which can be overridden by 2) a
+   * FunctionSpec called "format" in the context, 3) an outputformat
+   * FunctionSpec argument, or 4) a string argument that it will convert to a
+   * function spec but warn but convert a deprecated string argument
+   *
+   * @param ctx
+   * @param arg
+   * @return
+   * @throws FlumeSpecException
+   */
+  public static OutputFormat resolveOutputFormat(Context ctx, Object arg)
+      throws FlumeSpecException {
+    // If an argument is specified, use it with highest precedence.
+    if (arg != null) {
+      // this will warn about deprecation if it a string instead of outputformat
+      // spec
+      return FlumeBuilder.createFormat(FormatFactory.get(), arg);
+    }
+
+    // Next try the context. This must be a output format FunctionSpec.
+    Object format = ctx.getObj("format", Object.class);
+    if (format != null) {
+      return FlumeBuilder.createFormat(FormatFactory.get(), format);
+    }
+
+    // Last attempt, load from xml config file, and parse it. Ideally the
+    // FlumeConfiguration settings would be in the context, but we don't support
+    // this yet.
+    String strFormat = FlumeConfiguration.get().getDefaultOutputFormat();
+    try {
+      format = FlumeBuilder.buildSimpleArg(FlumeBuilder.parseArg(strFormat));
+    } catch (RecognitionException e) {
+      throw new FlumeSpecException("Unable to parse output format: "
+          + strFormat);
+    }
+    return FlumeBuilder.createFormat(FormatFactory.get(), format);
+  }
+}

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java Thu Aug 25 12:16:17 2011
@@ -37,6 +37,7 @@ import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeBuilder;
 import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.SinkBuilderUtil;
 import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
 import com.cloudera.flume.core.Event;
 import com.cloudera.flume.core.EventSink;
@@ -199,15 +200,22 @@ public class CustomDfsSink extends Event
               "usage: customdfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\", format)");
         }
 
-        Object format = (args.length == 1) ? null : args[1];
-        OutputFormat fmt;
+        Object formatArg = null;
+        if (args.length >= 2) {
+          formatArg = args[1];
+        }
+
+        OutputFormat o = null;
         try {
-          fmt = FlumeBuilder.createFormat(FormatFactory.get(), format);
+          o = SinkBuilderUtil.resolveOutputFormat(context, formatArg);
         } catch (FlumeSpecException e) {
-          LOG.error("failed to load format " + format, e);
-          throw new IllegalArgumentException("failed to load format " + format);
+          LOG.warn("Illegal format type " + formatArg + ".", e);
+          throw new IllegalArgumentException("failed to load format", e);
         }
-        return new CustomDfsSink(args[0].toString(), fmt);
+        Preconditions.checkArgument(o != null, "Illegal format type "
+            + formatArg + ".");
+
+        return new CustomDfsSink(args[0].toString(), o);
       }
 
       @Deprecated

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java Thu Aug 25 12:16:17 2011
@@ -31,6 +31,7 @@ import com.cloudera.flume.conf.FlumeBuil
 import com.cloudera.flume.conf.FlumeBuilder;
 import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.SinkBuilderUtil;
 import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
 import com.cloudera.flume.core.Event;
 import com.cloudera.flume.core.EventSink;
@@ -164,20 +165,19 @@ public class EscapedCustomDfsSink extend
           filename = args[1].toString();
         }
 
-        Object format = FlumeConfiguration.get().getDefaultOutputFormat();
+        Object formatArg = null;
         if (args.length >= 3) {
-          format = args[2];
+          formatArg = args[2];
         }
 
-        OutputFormat o;
+        OutputFormat o = null;
         try {
-          o = FlumeBuilder.createFormat(FormatFactory.get(), format);
+          o = SinkBuilderUtil.resolveOutputFormat(context, formatArg);
         } catch (FlumeSpecException e) {
-          LOG.warn("Illegal format type " + format + ".", e);
-          o = null;
+          LOG.warn("Illegal format type " + formatArg + ".", e);
         }
-        Preconditions.checkArgument(o != null, "Illegal format type " + format
-            + ".");
+        Preconditions.checkArgument(o != null, "Illegal format type "
+            + formatArg + ".");
 
         return new EscapedCustomDfsSink(args[0].toString(), filename, o);
       }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java?rev=1161531&r1=1161530&r2=1161531&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java Thu Aug 25 12:16:17 2011
@@ -25,10 +25,9 @@ import static org.mockito.Mockito.doThro
 import static org.mockito.Mockito.mock;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -47,7 +46,6 @@ import com.cloudera.flume.agent.durabili
 import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeArgException;
 import com.cloudera.flume.conf.FlumeBuilder;
-import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.conf.LogicalNodeContext;
 import com.cloudera.flume.conf.ReportTestingContext;
@@ -66,16 +64,15 @@ import com.cloudera.flume.handlers.endto
 import com.cloudera.flume.handlers.endtoend.AckListener;
 import com.cloudera.flume.handlers.hdfs.CustomDfsSink;
 import com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink;
+import com.cloudera.flume.handlers.hdfs.SeqfileEventSource;
 import com.cloudera.flume.handlers.rolling.ProcessTagger;
 import com.cloudera.flume.handlers.rolling.RollSink;
 import com.cloudera.flume.handlers.rolling.Tagger;
-import com.cloudera.flume.handlers.thrift.Priority;
-import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
 import com.cloudera.flume.reporter.ReportEvent;
 import com.cloudera.flume.reporter.ReportManager;
-import com.cloudera.util.FlumeTestHarness;
 import com.cloudera.util.Clock;
 import com.cloudera.util.FileUtil;
+import com.cloudera.util.FlumeTestHarness;
 import com.cloudera.util.Pair;
 
 /**
@@ -889,4 +886,35 @@ public class TestCollectorSink {
     coll.close();
   }
 
+  @Test(timeout=5000)
+  public void testCollectorOutputFormat() throws IOException,
+      FlumeSpecException, InterruptedException {
+    File f = FileUtil.mktempdir();
+    try {
+      final EventSink coll = FlumeBuilder.buildSink(
+          LogicalNodeContext.testingContext(), "collectorSink(" + "\"file://"
+              + f.getPath() + "\", \"prefix\", 5000, seqfile(\"bzip2\"))");
+      coll.open();
+      coll.append(new EventImpl("test".getBytes()));
+      coll.close();
+
+      // get the file generated by collector, and check to make sure it is a
+      // seqfile.
+      File f2 = f.listFiles(new FileFilter() {
+        @Override
+        public boolean accept(File arg) {
+          return arg.getName().startsWith("prefix");
+        }
+
+      })[0];
+      EventSource src = new SeqfileEventSource(f2.getAbsolutePath());
+      src.open();
+      Event e = src.next();
+      src.close();
+      assertEquals(new String(e.getBody()), "test");
+
+    } finally {
+      FileUtil.rmr(f);
+    }
+  }
 }