You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/10/01 00:55:54 UTC

svn commit: r1177872 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/conf/ main/java/com/cloudera/flume/handlers/hdfs/ test/java/com/cloudera/flume/handlers/hdfs/ test/java/com/cloudera/flume/handlers/rolling/

Author: jmhsieh
Date: Fri Sep 30 22:55:54 2011
New Revision: 1177872

URL: http://svn.apache.org/viewvc?rev=1177872&view=rev
Log:
FLUME-734: escapedFormatDfs goes into a file creation frenzy

Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Fri Sep 30 22:55:54 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
 import com.cloudera.util.Pair;
 import com.google.common.base.Preconditions;
 
@@ -948,6 +949,21 @@ public class FlumeConfiguration extends 
     return get(COLLECTOR_OUTPUT_FORMAT, "avrojson");
   }
 
+  public FunctionSpec getDefaultOutputFormatSpec() {
+    // Look at FormatFactory for possible values.
+    String defaultFormat = "avrojson";
+    String fmt = get(COLLECTOR_OUTPUT_FORMAT, defaultFormat);
+    Object fmtSpec = null;
+    try {
+      fmtSpec = FlumeBuilder.buildSimpleArg(FlumeBuilder.parseArg(fmt));
+    } catch (Exception e) {
+      LOG.warn("Problem parsing output format '" + fmt + "'; defaulting to "
+          + defaultFormat);
+      fmtSpec = new FunctionSpec(defaultFormat);
+    }
+    return (FunctionSpec)fmtSpec;
+  }
+
   public String getGangliaServers() {
     // gmond's default multicast ip and port
     return get(GANGLIA_SERVERS, "239.2.11.71:8649");
@@ -1013,8 +1029,8 @@ public class FlumeConfiguration extends 
     if (home == null) {
       home = ".";
     }
-    return home + File.separator + get(WEBAPP_ROOT_MASTER,
-      "webapps/flumemaster.war");
+    return home + File.separator
+        + get(WEBAPP_ROOT_MASTER, "webapps/flumemaster.war");
   }
 
   /**
@@ -1025,8 +1041,8 @@ public class FlumeConfiguration extends 
     if (home == null) {
       home = ".";
     }
-    return home + File.separator + get(WEBAPP_ROOT_NODE,
-        "webapps/flumeagent.war");
+    return home + File.separator
+        + get(WEBAPP_ROOT_NODE, "webapps/flumeagent.war");
   }
 
   /**

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/SinkBuilderUtil.java Fri Sep 30 22:55:54 2011
@@ -73,4 +73,8 @@ public class SinkBuilderUtil {
     }
     return FlumeBuilder.createFormat(FormatFactory.get(), format);
   }
+
+  public static FunctionSpec getDefaultOutputFormatSpec() {
+    return FlumeConfiguration.get().getDefaultOutputFormatSpec();
+  }
 }

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedCustomDfsSink.java Fri Sep 30 22:55:54 2011
@@ -55,7 +55,8 @@ import com.google.common.base.Preconditi
 public class EscapedCustomDfsSink extends EventSink.Base {
   static final Logger LOG = LoggerFactory.getLogger(EscapedCustomDfsSink.class);
   final String path;
-  OutputFormat format;
+  FunctionSpec formatSpec;
+  Context ctx;
 
   CustomDfsSink writer = null;
 
@@ -71,11 +72,13 @@ public class EscapedCustomDfsSink extend
   private String filename = "";
   protected String absolutePath = "";
 
-  public EscapedCustomDfsSink(String path, String filename, OutputFormat o) {
+  public EscapedCustomDfsSink(Context ctx, String path, String filename,
+      FunctionSpec fs) {
+    this.ctx = ctx;
     this.path = path;
     this.filename = filename;
     shouldSub = Event.containsTag(path) || Event.containsTag(filename);
-    this.format = o;
+    this.formatSpec = fs;
     absolutePath = path;
     if (filename != null && filename.length() > 0) {
       if (!absolutePath.endsWith(Path.SEPARATOR)) {
@@ -95,12 +98,21 @@ public class EscapedCustomDfsSink extend
     }
   }
 
-  public EscapedCustomDfsSink(String path, String filename) {
-    this(path, filename, getDefaultOutputFormat());
+  public EscapedCustomDfsSink(Context ctx, String path, String filename) {
+    this(ctx, path, filename, SinkBuilderUtil.getDefaultOutputFormatSpec());
   }
 
   protected CustomDfsSink openWriter(String p) throws IOException {
     LOG.info("Opening " + p);
+    // We need to instantiate a new outputFormat for each CustomDfsSink.
+    OutputFormat format;
+    try {
+      format = SinkBuilderUtil.resolveOutputFormat(ctx, formatSpec);
+    } catch (FlumeSpecException e) {
+      format = getDefaultOutputFormat();
+      LOG.warn("Had problem creating format " + formatSpec
+          + "; reverting to default:" + format);
+    }
     CustomDfsSink w = new CustomDfsSink(p, format);
     w.open();
     return w;
@@ -179,7 +191,14 @@ public class EscapedCustomDfsSink extend
         Preconditions.checkArgument(o != null, "Illegal format type "
             + formatArg + ".");
 
-        return new EscapedCustomDfsSink(args[0].toString(), filename, o);
+        // handle legacy string format arguments 
+        // TODO only support FunctionSpec in the future
+        if (formatArg instanceof String) {
+          formatArg = new FunctionSpec((String) formatArg);
+        }
+        FunctionSpec formatFunc = (FunctionSpec) formatArg;
+        return new EscapedCustomDfsSink(context, args[0].toString(), filename,
+            formatFunc);
       }
 
       @Deprecated

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java Fri Sep 30 22:55:54 2011
@@ -204,4 +204,33 @@ public class TestEscapedCustomOutputDfs 
     FlumeBuilder.buildSink(new Context(), src);
   }
 
+  /**
+   * Some output formats cache an output stream and each hdfs file thus needs to
+   * make sure it has its own copy of the outputStream.
+   * 
+   * @throws IOException
+   * @throws FlumeSpecException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testNoOutputFormatSharingProblem() throws IOException,
+      FlumeSpecException, InterruptedException {
+    File f = FileUtil.mktempdir("newFileOutputFormatPer");
+    String snk = "escapedFormatDfs(\"file://" + f.getAbsoluteFile()
+        + "\", \"%{nanos}\", seqfile)";
+
+    Event e1 = new EventImpl("e1".getBytes());
+    Event e2 = new EventImpl("e2".getBytes());
+
+    EventSink evtSnk = FlumeBuilder.buildSink(new Context(), snk);
+
+    try {
+      evtSnk.open();
+      evtSnk.append(e1);
+      evtSnk.append(e2);
+      evtSnk.close();
+    } finally {
+      FileUtil.rmr(f);
+    }
+  }
 }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java?rev=1177872&r1=1177871&r2=1177872&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java Fri Sep 30 22:55:54 2011
@@ -90,7 +90,7 @@ public class TestRollSink {
         10000), 250) {
       @Override
       protected EventSink newSink(Context ctx) throws IOException {
-        return new EscapedCustomDfsSink("file:///" + f.getPath(),
+        return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
             "sub-%{service}%{rolltag}");
       }
     };
@@ -417,7 +417,7 @@ public class TestRollSink {
         10000), 250) {
       @Override
       protected EventSink newSink(Context ctx) throws IOException {
-        return new EscapedCustomDfsSink("file:///" + f.getPath(),
+        return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
             "sub-%{service}%{rolltag}");
       }
     };