You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/01/31 20:38:26 UTC
svn commit: r1238766 - in /incubator/flume/trunk: ./ flume-core/
flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/
flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/
flume-core/src/test/java/com/cloudera/flume/agent/durability/ flum...
Author: prasadm
Date: Tue Jan 31 19:38:26 2012
New Revision: 1238766
URL: http://svn.apache.org/viewvc?rev=1238766&view=rev
Log:
FLUME-937: Make Flume compile against Hadoop 0.23
New profile for hadoop-0.23 and additional code chagnes to handle SequenceFile wrappers
Modified:
incubator/flume/trunk/flume-core/pom.xml
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
incubator/flume/trunk/pom.xml
Modified: incubator/flume/trunk/flume-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/pom.xml?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/pom.xml (original)
+++ incubator/flume/trunk/flume-core/pom.xml Tue Jan 31 19:38:26 2012
@@ -15,6 +15,27 @@
<properties>
<build.revision>NOT AVAILABLE</build.revision>
</properties>
+
+ <profiles>
+ <profile>
+ <id>hadoop-0.23</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>23</value>
+ </property>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<build>
<plugins>
@@ -64,6 +85,10 @@
</filterset>
</copy>
</target>
+ <excludes>
+ <exclude>${FlushingSeqFile}</exclude>
+ <exclude>${RawSeqFile}</exclude>
+ </excludes>
</configuration>
</execution>
</executions>
@@ -299,7 +324,8 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
+ <artifactId>${hadoop.common.artifact.id}</artifactId>
+ <version>${hadoop.version}</version>
</dependency>
<dependency>
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java Tue Jan 31 19:38:26 2012
@@ -17,14 +17,17 @@
*/
package com.cloudera.flume.handlers.hdfs;
+import static org.apache.hadoop.util.VersionInfo.getVersion;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.FlushingSequenceFileWriter;
-import org.apache.hadoop.io.RawSequenceFileWriter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.slf4j.Logger;
@@ -60,6 +63,7 @@ public class SeqfileEventSink extends Ev
/*
* This is assumed to be open
*/
+ @SuppressWarnings({ "rawtypes", "unchecked"})
public void open() throws IOException {
LOG.debug("opening " + f);
@@ -74,21 +78,47 @@ public class SeqfileEventSink extends Ev
FileSystem fs = FileSystem.getLocal(conf);
try {
+ Method mGetWriter;
+ Class seqClass;
- if (conf.getWALOutputBuffering()) {
- writer = RawSequenceFileWriter.createWriter(fs, conf,
- new Path(f.getAbsolutePath()), WriteableEventKey.class,
- WriteableEvent.class, CompressionType.NONE);
+ // The sequence file wrappers which are not compiling with 0.23
+ // and not needed with newer HDFS
+ String tmpVer = getVersion().substring(0, 4);
+ if (tmpVer.compareToIgnoreCase("0.23") < 0) {
+ if (conf.getWALOutputBuffering()) {
+ seqClass = Class.forName("org.apache.hadoop.io.RawSequenceFileWriter");
+ mGetWriter = seqClass.getMethod("createWriter", FileSystem.class,
+ Configuration.class, Path.class, Class.class, Class.class,
+ CompressionType.class);
+ writer = (SequenceFile.Writer) mGetWriter.invoke(null, fs, conf,
+ new Path(f.getAbsolutePath()), WriteableEventKey.class,
+ WriteableEvent.class, CompressionType.NONE);
+ } else {
+ seqClass = Class.forName("org.apache.hadoop.io.FlushingSequenceFileWriter");
+ mGetWriter = seqClass.getMethod("createWriter", FileSystem.class,
+ Configuration.class, File.class, Class.class, Class.class);
+ writer = (SequenceFile.Writer) mGetWriter.invoke(null, conf, f,
+ WriteableEventKey.class, WriteableEvent.class);
+ }
} else {
- writer = FlushingSequenceFileWriter.createWriter(conf, f,
- WriteableEventKey.class, WriteableEvent.class);
-
+ writer = SequenceFile.createWriter(fs, conf,
+ new Path(f.getAbsolutePath()), WriteableEventKey.class,
+ WriteableEvent.class, CompressionType.NONE);
}
-
} catch (FileNotFoundException fnfe) {
LOG.error("Possible permissions problem when creating " + f, fnfe);
throw fnfe;
+ } catch (ClassNotFoundException eC) {
+ throw new RuntimeException(eC);
+ } catch (InvocationTargetException eI) {
+ throw new RuntimeException(eI);
+ } catch (IllegalAccessException el) {
+ throw new RuntimeException(el);
+ } catch (NoSuchMethodException eN) {
+ throw new RuntimeException(eN);
+ } catch (SecurityException eS) {
+ throw new RuntimeException(eS);
}
}
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/seqfile/SequenceFileOutputFormat.java Tue Jan 31 19:38:26 2012
@@ -30,6 +30,8 @@ import com.google.common.base.Preconditi
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
@@ -37,6 +39,8 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import static org.apache.hadoop.mapred.SequenceFileOutputFormat.getOutputCompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +62,8 @@ public class SequenceFileOutputFormat ex
private Writer writer;
public SequenceFileOutputFormat() {
- this(SequenceFile.getCompressionType(FlumeConfiguration.get()),
- new DefaultCodec());
+ this(getOutputCompressionType(new JobConf(FlumeConfiguration.get())),
+ new DefaultCodec());
}
public SequenceFileOutputFormat(CompressionType compressionType,
@@ -105,7 +109,7 @@ public class SequenceFileOutputFormat ex
if (args.length > 0) {
codecName = args[0];
FlumeConfiguration conf = FlumeConfiguration.get();
- CompressionType compressionType = SequenceFile.getCompressionType(conf);
+ CompressionType compressionType = getOutputCompressionType(new JobConf(conf));
CompressionCodec codec = CustomDfsSink.getCodec(conf, codecName);
format = new SequenceFileOutputFormat(compressionType, codec);
} else {
@@ -124,4 +128,5 @@ public class SequenceFileOutputFormat ex
};
}
+
}
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java Tue Jan 31 19:38:26 2012
@@ -239,7 +239,7 @@ public class TestFlumeNodeWALNotifierRac
* @throws IOException
* @throws InterruptedException
*/
- @Test(timeout = 100000)
+ @Test(timeout = 300000)
public void testRetryWriting() throws IOException, InterruptedException {
final int count = 10000;
retryWritingRacynessRun(count);
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestNaiveFileWALManager.java Tue Jan 31 19:38:26 2012
@@ -20,6 +20,7 @@ package com.cloudera.flume.agent.durabil
import static org.junit.Assert.assertEquals;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
@@ -45,6 +46,16 @@ public class TestNaiveFileWALManager {
// has 5 good entries.
final static String WAL_OK = "data/hadoop_logs_5.hdfs";
public static Logger LOG = Logger.getLogger(TestNaiveFileWALManager.class);
+ // exclude CRC files from the listing
+ private FilenameFilter crcFilter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.endsWith(".crc")) {
+ return false;
+ }
+ return true;
+ }
+ };
@Before
public void setUp() {
@@ -163,15 +174,15 @@ public class TestNaiveFileWALManager {
wal.recover();
// check to make sure wal file is gone
- // assertTrue(new File(tmp, "import").list().length == 0);
- assertEquals(0, new File(tmp, "writing").list().length);
- assertEquals(0, new File(tmp, "sending").list().length);
- assertEquals(0, new File(tmp, "sent").list().length);
- assertEquals(0, new File(tmp, "done").list().length);
+ // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+ assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "done").list(crcFilter).length);
// pre-existing error, and writing didn't have proper ack wrappers
- assertEquals(4, new File(tmp, "error").list().length);
+ assertEquals(4, new File(tmp, "error").list(crcFilter).length);
// logged, writing, sending, sent
- assertEquals(4, new File(tmp, "logged").list().length);
+ assertEquals(4, new File(tmp, "logged").list(crcFilter).length);
FlumeTestHarness.cleanupLocalWriteDir();
}
@@ -205,13 +216,13 @@ public class TestNaiveFileWALManager {
wal.recover();
// check to make sure wal file is gone
- // assertTrue(new File(tmp, "import").list().length == 0);
- assertEquals(0, new File(tmp, "writing").list().length);
- assertEquals(0, new File(tmp, "sending").list().length);
- assertEquals(0, new File(tmp, "sent").list().length);
- assertEquals(0, new File(tmp, "done").list().length);
- assertEquals(1, new File(tmp, "error").list().length);
- assertEquals(1, new File(tmp, "logged").list().length);
+ // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+ assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
FlumeTestHarness.cleanupLocalWriteDir();
}
@@ -247,13 +258,13 @@ public class TestNaiveFileWALManager {
wal.recover();
// check to make sure wal file is gone
- // assertTrue(new File(tmp, "import").list().length == 0);
- assertEquals(0, new File(tmp, "writing").list().length);
- assertEquals(0, new File(tmp, "sending").list().length);
- assertEquals(0, new File(tmp, "sent").list().length);
- assertEquals(0, new File(tmp, "done").list().length);
- assertEquals(1, new File(tmp, "error").list().length);
- assertEquals(1, new File(tmp, "logged").list().length);
+ // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+ assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
FlumeTestHarness.cleanupLocalWriteDir();
@@ -282,13 +293,13 @@ public class TestNaiveFileWALManager {
wal.recover();
// check to make sure wal file is gone
- // assertTrue(new File(tmp, "import").list().length == 0);
- assertEquals(0, new File(tmp, "writing").list().length);
- assertEquals(0, new File(tmp, "sending").list().length);
- assertEquals(0, new File(tmp, "sent").list().length);
- assertEquals(0, new File(tmp, "done").list().length);
- assertEquals(1, new File(tmp, "error").list().length);
- assertEquals(1, new File(tmp, "logged").list().length);
+ // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+ assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
FlumeTestHarness.cleanupLocalWriteDir();
@@ -327,13 +338,13 @@ public class TestNaiveFileWALManager {
wal.recover();
// check to make sure wal file is gone
- // assertTrue(new File(tmp, "import").list().length == 0);
- assertEquals(0, new File(tmp, "writing").list().length);
- assertEquals(0, new File(tmp, "sending").list().length);
- assertEquals(0, new File(tmp, "sent").list().length);
- assertEquals(0, new File(tmp, "done").list().length);
- assertEquals(1, new File(tmp, "error").list().length);
- assertEquals(1, new File(tmp, "logged").list().length);
+ // assertTrue(new File(tmp, "import").list(crcFilter).length == 0);
+ assertEquals(0, new File(tmp, "writing").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sending").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "sent").list(crcFilter).length);
+ assertEquals(0, new File(tmp, "done").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "error").list(crcFilter).length);
+ assertEquals(1, new File(tmp, "logged").list(crcFilter).length);
FlumeTestHarness.cleanupLocalWriteDir();
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java Tue Jan 31 19:38:26 2012
@@ -139,8 +139,10 @@ public class TestEscapedCustomOutputDfs
*/
@Test
public void testGZipCodec() throws IOException, InterruptedException {
+ GzipCodec codec = new GzipCodec();
+ codec.setConf(FlumeConfiguration.get());
checkOutputFormat("syslog", new SyslogEntryFormat(), "GzipCodec",
- new GzipCodec());
+ codec);
}
/**
Modified: incubator/flume/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/pom.xml?rev=1238766&r1=1238765&r2=1238766&view=diff
==============================================================================
--- incubator/flume/trunk/pom.xml (original)
+++ incubator/flume/trunk/pom.xml Tue Jan 31 19:38:26 2012
@@ -27,6 +27,10 @@
<profile>
<id>full-build</id>
<activation>
+ <property>
+ <name>full-build.profile</name>
+ <value>yes</value>
+ </property>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
@@ -45,6 +49,9 @@
<profile>
<id>dev</id>
<activation>
+ <property>
+ <name>!full-build.profile</name>
+ </property>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
@@ -84,6 +91,67 @@
</modules>
</profile>
+ <profile>
+ <id>hadoop-0.20.2-CDH3B4</id>
+ <activation>
+ <property>
+ <name>!hadoop.profile</name>
+ </property>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <hadoop.version>0.20.2-CDH3B4</hadoop.version>
+ <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id>
+ <FlushingSeqFile>bar.java</FlushingSeqFile>
+ <RawSeqFile>foo.java</RawSeqFile>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>${hadoop.common.artifact.id}</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
+ <profile>
+ <id>hadoop-0.23</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>23</value>
+ </property>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <hadoop.version>0.23.1-SNAPSHOT</hadoop.version>
+ <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+ <FlushingSeqFile>**/FlushingSequenceFileWriter.java</FlushingSeqFile>
+ <RawSeqFile>**/RawSequenceFileWriter.java</RawSeqFile>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>${hadoop.common.artifact.id}</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
</profiles>
<inceptionYear>2009</inceptionYear>
@@ -341,6 +409,15 @@
</snapshots>
</repository>
+ <repository>
+ <id>apache.staging.https</id>
+ <name>Apache Staging Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+
</repositories>
<build>
@@ -355,6 +432,10 @@
<configuration>
<source>1.6</source>
<target>1.6</target>
+ <excludes>
+ <exclude>${FlushingSeqFile}</exclude>
+ <exclude>${RawSeqFile}</exclude>
+ </excludes>
</configuration>
</plugin>
@@ -588,12 +669,6 @@
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2-CDH3B4</version>
- </dependency>
-
- <dependency>
<groupId>org.arabidopsis.ahocorasick</groupId>
<artifactId>ahocorasick</artifactId>
<version>2.x</version>