You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/01/31 20:38:26 UTC

svn commit: r1238766 - in /incubator/flume/trunk: ./ flume-core/ flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/ flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/ flume-core/src/test/java/com/cloudera/flume/agent/durability/ flum...

Author: prasadm
Date: Tue Jan 31 19:38:26 2012
New Revision: 1238766

URL: http://svn.apache.org/viewvc?rev=1238766&view=rev
Log:
FLUME-937: Make Flume compile against Hadoop 0.23
New profile for hadoop-0.23 and additional code chagnes to handle SequenceFile wrappers

Modified:
    incubator/flume/trunk/flume-core/pom.xml
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
    incubator/flume/trunk/pom.xml

Modified: incubator/flume/trunk/flume-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/pom.xml?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/pom.xml (original)
+++ incubator/flume/trunk/flume-core/pom.xml Tue Jan 31 19:38:26 2012
@@ -15,6 +15,27 @@
   <properties>
     <build.revision>NOT AVAILABLE</build.revision>
   </properties>
+
+  <profiles>
+    <profile>
+      <id>hadoop-0.23</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>23</value>
+        </property>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <scope>provided</scope>
+        </dependency>
+        </dependencies>
+    </profile>
+  </profiles>
+
   <build>
     <plugins>
 
@@ -64,6 +85,10 @@
                   </filterset>
                 </copy>
               </target>
+	            <excludes>
+               <exclude>${FlushingSeqFile}</exclude>
+               <exclude>${RawSeqFile}</exclude>
+	            </excludes>
             </configuration>
           </execution>
         </executions>
@@ -299,7 +324,8 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+      <version>${hadoop.version}</version>
     </dependency>
 
     <dependency>

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java Tue Jan 31 19:38:26 2012
@@ -17,14 +17,17 @@
  */
 package com.cloudera.flume.handlers.hdfs;
 
+import static org.apache.hadoop.util.VersionInfo.getVersion;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.FlushingSequenceFileWriter;
-import org.apache.hadoop.io.RawSequenceFileWriter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.slf4j.Logger;
@@ -60,6 +63,7 @@ public class SeqfileEventSink extends Ev
   /*
    * This is assumed to be open
    */
+  @SuppressWarnings({ "rawtypes", "unchecked"})
   public void open() throws IOException {
     LOG.debug("opening " + f);
 
@@ -74,21 +78,47 @@ public class SeqfileEventSink extends Ev
     FileSystem fs = FileSystem.getLocal(conf);
 
     try {
+      Method mGetWriter;
+      Class seqClass;
 
-      if (conf.getWALOutputBuffering()) {
-        writer = RawSequenceFileWriter.createWriter(fs, conf,
-            new Path(f.getAbsolutePath()), WriteableEventKey.class,
-            WriteableEvent.class, CompressionType.NONE);
+      // The sequence file wrappers which are not compiling with 0.23
+      // and not needed with newer HDFS
+      String tmpVer = getVersion().substring(0, 4);
+      if (tmpVer.compareToIgnoreCase("0.23") < 0) {
+        if (conf.getWALOutputBuffering()) {
+          seqClass = Class.forName("org.apache.hadoop.io.RawSequenceFileWriter");
+          mGetWriter = seqClass.getMethod("createWriter", FileSystem.class,
+              Configuration.class, Path.class, Class.class, Class.class,
+              CompressionType.class);
+          writer = (SequenceFile.Writer) mGetWriter.invoke(null, fs, conf,
+              new Path(f.getAbsolutePath()), WriteableEventKey.class,
+              WriteableEvent.class, CompressionType.NONE);
+        } else {
+          seqClass = Class.forName("org.apache.hadoop.io.FlushingSequenceFileWriter");
+          mGetWriter = seqClass.getMethod("createWriter", FileSystem.class,
+              Configuration.class, File.class, Class.class, Class.class);
+          writer = (SequenceFile.Writer) mGetWriter.invoke(null, conf, f,
+              WriteableEventKey.class, WriteableEvent.class);
+          }
 
       } else {
-        writer = FlushingSequenceFileWriter.createWriter(conf, f,
-            WriteableEventKey.class, WriteableEvent.class);
-
+        writer = SequenceFile.createWriter(fs, conf,
+            new Path(f.getAbsolutePath()), WriteableEventKey.class,
+            WriteableEvent.class, CompressionType.NONE);
       }
-
     } catch (FileNotFoundException fnfe) {
       LOG.error("Possible permissions problem when creating " + f, fnfe);
       throw fnfe;
+    } catch (ClassNotFoundException eC) {
+      throw new RuntimeException(eC);
+    } catch (InvocationTargetException eI) {
+      throw new RuntimeException(eI);
+    } catch (IllegalAccessException el) {
+      throw new RuntimeException(el);
+    } catch (NoSuchMethodException eN) {
+      throw new RuntimeException(eN);
+    } catch (SecurityException eS) {
+      throw new RuntimeException(eS);
     }
   }
 

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java Tue Jan 31 19:38:26 2012
@@ -30,6 +30,8 @@ import com.google.common.base.Preconditi
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
@@ -37,6 +39,8 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import static org.apache.hadoop.mapred.SequenceFileOutputFormat.getOutputCompressionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,8 +62,8 @@ public class SequenceFileOutputFormat ex
   private Writer writer;
   
   public SequenceFileOutputFormat() {
-    this(SequenceFile.getCompressionType(FlumeConfiguration.get()),
-        new DefaultCodec());
+    this(getOutputCompressionType(new JobConf(FlumeConfiguration.get())),
+         new DefaultCodec());
   }
   
   public SequenceFileOutputFormat(CompressionType compressionType,
@@ -105,7 +109,7 @@ public class SequenceFileOutputFormat ex
         if (args.length > 0) {
           codecName = args[0];
           FlumeConfiguration conf = FlumeConfiguration.get();
-          CompressionType compressionType = SequenceFile.getCompressionType(conf);
+          CompressionType compressionType = getOutputCompressionType(new JobConf(conf));
           CompressionCodec codec = CustomDfsSink.getCodec(conf, codecName);
           format = new SequenceFileOutputFormat(compressionType, codec);
         } else {
@@ -124,4 +128,5 @@ public class SequenceFileOutputFormat ex
 
     };
   }
+
 }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java Tue Jan 31 19:38:26 2012
@@ -239,7 +239,7 @@ public class TestFlumeNodeWALNotifierRac
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test(timeout = 100000)
+  @Test(timeout = 300000)
   public void testRetryWriting() throws IOException, InterruptedException {
     final int count = 10000;
     retryWritingRacynessRun(count);

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java Tue Jan 31 19:38:26 2012
@@ -20,6 +20,7 @@ package com.cloudera.flume.agent.durabil
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -45,6 +46,16 @@ public class TestNaiveFileWALManager {
   // has 5 good entries.
   final static String WAL_OK = "data/hadoop_logs_5.hdfs";
   public static Logger LOG = Logger.getLogger(TestNaiveFileWALManager.class);
+  // exclude CRC files from the listing
+  private FilenameFilter crcFilter = new FilenameFilter() {
+    @Override
+    public boolean accept(File dir, String name) {
+      if (name.endsWith(".crc")) {
+        return false;
+    }
+    return true;
+    }
+  };
 
   @Before
   public void setUp() {
@@ -163,15 +174,15 @@ public class TestNaiveFileWALManager {
     wal.recover();
 
     // check to make sure wal file is gone
-    // assertTrue(new File(tmp, "import").list().length == 0);
-    assertEquals(0, new File(tmp, "writing").list().length);
-    assertEquals(0, new File(tmp, "sending").list().length);
-    assertEquals(0, new File(tmp, "sent").list().length);
-    assertEquals(0, new File(tmp, "done").list().length);
+    // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+    assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "done").list(crcFilter).length);
     // pre-existing error, and writing didn't have proper ack wrappers
-    assertEquals(4, new File(tmp, "error").list().length);
+    assertEquals(4, new File(tmp, "error").list(crcFilter).length);
     // logged, writing, sending, sent
-    assertEquals(4, new File(tmp, "logged").list().length);
+    assertEquals(4, new File(tmp, "logged").list(crcFilter).length);
 
     FlumeTestHarness.cleanupLocalWriteDir();
   }
@@ -205,13 +216,13 @@ public class TestNaiveFileWALManager {
     wal.recover();
 
     // check to make sure wal file is gone
-    // assertTrue(new File(tmp, "import").list().length == 0);
-    assertEquals(0, new File(tmp, "writing").list().length);
-    assertEquals(0, new File(tmp, "sending").list().length);
-    assertEquals(0, new File(tmp, "sent").list().length);
-    assertEquals(0, new File(tmp, "done").list().length);
-    assertEquals(1, new File(tmp, "error").list().length);
-    assertEquals(1, new File(tmp, "logged").list().length);
+    // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+    assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
 
     FlumeTestHarness.cleanupLocalWriteDir();
   }
@@ -247,13 +258,13 @@ public class TestNaiveFileWALManager {
     wal.recover();
 
     // check to make sure wal file is gone
-    // assertTrue(new File(tmp, "import").list().length == 0);
-    assertEquals(0, new File(tmp, "writing").list().length);
-    assertEquals(0, new File(tmp, "sending").list().length);
-    assertEquals(0, new File(tmp, "sent").list().length);
-    assertEquals(0, new File(tmp, "done").list().length);
-    assertEquals(1, new File(tmp, "error").list().length);
-    assertEquals(1, new File(tmp, "logged").list().length);
+    // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+    assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
 
     FlumeTestHarness.cleanupLocalWriteDir();
 
@@ -282,13 +293,13 @@ public class TestNaiveFileWALManager {
     wal.recover();
 
     // check to make sure wal file is gone
-    // assertTrue(new File(tmp, "import").list().length == 0);
-    assertEquals(0, new File(tmp, "writing").list().length);
-    assertEquals(0, new File(tmp, "sending").list().length);
-    assertEquals(0, new File(tmp, "sent").list().length);
-    assertEquals(0, new File(tmp, "done").list().length);
-    assertEquals(1, new File(tmp, "error").list().length);
-    assertEquals(1, new File(tmp, "logged").list().length);
+    // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+    assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
 
     FlumeTestHarness.cleanupLocalWriteDir();
 
@@ -327,13 +338,13 @@ public class TestNaiveFileWALManager {
     wal.recover();
 
     // check to make sure wal file is gone
-    // assertTrue(new File(tmp, "import").list().length == 0);
-    assertEquals(0, new File(tmp, "writing").list().length);
-    assertEquals(0, new File(tmp, "sending").list().length);
-    assertEquals(0, new File(tmp, "sent").list().length);
-    assertEquals(0, new File(tmp, "done").list().length);
-    assertEquals(1, new File(tmp, "error").list().length);
-    assertEquals(1, new File(tmp, "logged").list().length);
+    // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+    assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+    assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+    assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
 
     FlumeTestHarness.cleanupLocalWriteDir();
 

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java Tue Jan 31 19:38:26 2012
@@ -139,8 +139,10 @@ public class TestEscapedCustomOutputDfs 
    */
   @Test
   public void testGZipCodec() throws IOException, InterruptedException {
+    GzipCodec codec = new GzipCodec();
+    codec.setConf(FlumeConfiguration.get());
     checkOutputFormat("syslog", new SyslogEntryFormat(), "GzipCodec",
-        new GzipCodec());
+        codec);
   }
 
   /**

Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Tue Jan 31 19:38:26 2012
@@ -27,6 +27,10 @@
     <profile>
       <id>full-build</id>
       <activation>
+        <property>
+          <name>full-build.profile</name>
+          <value>yes</value>
+        </property>
         <activeByDefault>false</activeByDefault>
       </activation>
       <modules>
@@ -45,6 +49,9 @@
     <profile>
       <id>dev</id>
       <activation>
+        <property>
+          <name>!full-build.profile</name>
+        </property>
         <activeByDefault>true</activeByDefault>
       </activation>
       <modules>
@@ -84,6 +91,67 @@
       </modules>
     </profile>
 
+    <profile>
+      <id>hadoop-0.20.2-CDH3B4</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <hadoop.version>0.20.2-CDH3B4</hadoop.version>
+        <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id>
+        <FlushingSeqFile>bar.java</FlushingSeqFile>
+        <RawSeqFile>foo.java</RawSeqFile>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>${hadoop.common.artifact.id}</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
+    <profile>
+      <id>hadoop-0.23</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>23</value>
+        </property>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <hadoop.version>0.23.1-SNAPSHOT</hadoop.version>
+        <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+        <FlushingSeqFile>**/FlushingSequenceFileWriter.java</FlushingSeqFile>
+        <RawSeqFile>**/RawSequenceFileWriter.java</RawSeqFile>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>${hadoop.common.artifact.id}</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
   </profiles>
 
   <inceptionYear>2009</inceptionYear>
@@ -341,6 +409,15 @@
       </snapshots>
     </repository>
 
+    <repository>
+      <id>apache.staging.https</id>
+      <name>Apache Staging Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+
   </repositories>
 
   <build>
@@ -355,6 +432,10 @@
           <configuration>
             <source>1.6</source>
             <target>1.6</target>
+	          <excludes>
+              <exclude>${FlushingSeqFile}</exclude>
+              <exclude>${RawSeqFile}</exclude>
+	          </excludes>
           </configuration>
         </plugin>
 
@@ -588,12 +669,6 @@
       </dependency>
 
       <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-core</artifactId>
-        <version>0.20.2-CDH3B4</version>
-      </dependency>
-
-      <dependency>
         <groupId>org.arabidopsis.ahocorasick</groupId>
         <artifactId>ahocorasick</artifactId>
         <version>2.x</version>