You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [14/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,156 @@
+package kafka.etl;
+
+import java.net.URI;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+@SuppressWarnings("deprecation")
+public class KafkaETLJob {
+
+ public static final String HADOOP_PREFIX = "hadoop-conf.";
+ /**
+ * Create a job configuration
+ */
+ @SuppressWarnings("rawtypes")
+ public static JobConf createJobConf(String name, String topic, Props props, Class classobj)
+ throws Exception {
+ JobConf conf = getJobConf(name, props, classobj);
+
+ conf.set("topic", topic);
+
+ // input format
+ conf.setInputFormat(KafkaETLInputFormat.class);
+
+ //turn off mapper speculative execution
+ conf.setMapSpeculativeExecution(false);
+
+ // setup multiple outputs
+ MultipleOutputs.addMultiNamedOutput(conf, "offsets", SequenceFileOutputFormat.class,
+ KafkaETLKey.class, BytesWritable.class);
+
+
+ return conf;
+ }
+
+ /**
+ * Helper function to initialize a job configuration
+ */
+ public static JobConf getJobConf(String name, Props props, Class classobj) throws Exception {
+ JobConf conf = new JobConf();
+ // set custom class loader with custom find resource strategy.
+
+ conf.setJobName(name);
+ String hadoop_ugi = props.getProperty("hadoop.job.ugi", null);
+ if (hadoop_ugi != null) {
+ conf.set("hadoop.job.ugi", hadoop_ugi);
+ }
+
+ if (props.getBoolean("is.local", false)) {
+ conf.set("mapred.job.tracker", "local");
+ conf.set("fs.default.name", "file:///");
+ conf.set("mapred.local.dir", "/tmp/map-red");
+
+ info("Running locally, no hadoop jar set.");
+ } else {
+ setClassLoaderAndJar(conf, classobj);
+ info("Setting hadoop jar file for class:" + classobj + " to " + conf.getJar());
+ info("*************************************************************************");
+ info(" Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + ") ");
+ info("*************************************************************************");
+ }
+
+ // set JVM options if present
+ if (props.containsKey("mapred.child.java.opts")) {
+ conf.set("mapred.child.java.opts", props.getProperty("mapred.child.java.opts"));
+ info("mapred.child.java.opts set to " + props.getProperty("mapred.child.java.opts"));
+ }
+
+ // Adds External jars to hadoop classpath
+ String externalJarList = props.getProperty("hadoop.external.jarFiles", null);
+ if (externalJarList != null) {
+ String[] jarFiles = externalJarList.split(",");
+ for (String jarFile : jarFiles) {
+ info("Adding extenral jar File:" + jarFile);
+ DistributedCache.addFileToClassPath(new Path(jarFile), conf);
+ }
+ }
+
+ // Adds distributed cache files
+ String cacheFileList = props.getProperty("hadoop.cache.files", null);
+ if (cacheFileList != null) {
+ String[] cacheFiles = cacheFileList.split(",");
+ for (String cacheFile : cacheFiles) {
+ info("Adding Distributed Cache File:" + cacheFile);
+ DistributedCache.addCacheFile(new URI(cacheFile), conf);
+ }
+ }
+
+ // Adds distributed cache files
+ String archiveFileList = props.getProperty("hadoop.cache.archives", null);
+ if (archiveFileList != null) {
+ String[] archiveFiles = archiveFileList.split(",");
+ for (String archiveFile : archiveFiles) {
+ info("Adding Distributed Cache Archive File:" + archiveFile);
+ DistributedCache.addCacheArchive(new URI(archiveFile), conf);
+ }
+ }
+
+ String hadoopCacheJarDir = props.getProperty("hdfs.default.classpath.dir", null);
+ if (hadoopCacheJarDir != null) {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs != null) {
+ FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir));
+
+ if (status != null) {
+ for (int i = 0; i < status.length; ++i) {
+ if (!status[i].isDir()) {
+ Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName());
+ info("Adding Jar to Distributed Cache Archive File:" + path);
+
+ DistributedCache.addFileToClassPath(path, conf);
+ }
+ }
+ } else {
+ info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty.");
+ }
+ } else {
+ info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " filesystem doesn't exist");
+ }
+ }
+
+ // May want to add this to HadoopUtils, but will await refactoring
+ for (String key : props.stringPropertyNames()) {
+ String lowerCase = key.toLowerCase();
+ if (lowerCase.startsWith(HADOOP_PREFIX)) {
+ String newKey = key.substring(HADOOP_PREFIX.length());
+ conf.set(newKey, props.getProperty(key));
+ }
+ }
+
+ KafkaETLUtils.setPropsInJob(conf, props);
+
+ return conf;
+ }
+
+ public static void info(String message) {
+ System.out.println(message);
+ }
+
+ public static void setClassLoaderAndJar(JobConf conf,
+ @SuppressWarnings("rawtypes") Class jobClass) {
+ conf.setClassLoader(Thread.currentThread().getContextClassLoader());
+ String jar = KafkaETLUtils.findContainingJar(jobClass, Thread
+ .currentThread().getContextClassLoader());
+ if (jar != null) {
+ conf.setJar(jar);
+ }
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,88 @@
+package kafka.etl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import kafka.etl.KafkaETLKey;
+
+public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
+
+ protected int _inputIndex;
+ protected long _offset;
+ protected long _checksum;
+
+ /**
+ * dummy empty constructor
+ */
+ public KafkaETLKey() {
+ _inputIndex = 0;
+ _offset = 0;
+ _checksum = 0;
+ }
+
+ public KafkaETLKey (int index, long offset) {
+ _inputIndex = index;
+ _offset = offset;
+ _checksum = 0;
+ }
+
+ public KafkaETLKey (int index, long offset, long checksum) {
+ _inputIndex = index;
+ _offset = offset;
+ _checksum = checksum;
+ }
+
+ public void set(int index, long offset, long checksum) {
+ _inputIndex = index;
+ _offset = offset;
+ _checksum = checksum;
+ }
+
+ public int getIndex() {
+ return _inputIndex;
+ }
+
+ public long getOffset() {
+ return _offset;
+ }
+
+ public long getChecksum() {
+ return _checksum;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ _inputIndex = in.readInt();
+ _offset = in.readLong();
+ _checksum = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(_inputIndex);
+ out.writeLong(_offset);
+ out.writeLong(_checksum);
+ }
+
+ @Override
+ public int compareTo(KafkaETLKey o) {
+ if (_inputIndex != o._inputIndex)
+ return _inputIndex = o._inputIndex;
+ else {
+ if (_offset > o._offset) return 1;
+ else if (_offset < o._offset) return -1;
+ else {
+ if (_checksum > o._checksum) return 1;
+ else if (_checksum < o._checksum) return -1;
+ else return 0;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "index=" + _inputIndex + " offset=" + _offset + " checksum=" + _checksum;
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,164 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+@SuppressWarnings({ "deprecation" })
+public class KafkaETLRecordReader
+extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> {
+
+ /* max number of retries */
+ protected Props _props; /*properties*/
+ protected JobConf _job;
+ protected Reporter _reporter ;
+ protected MultipleOutputs _mos;
+ protected List<KafkaETLContext> _contextList;
+ protected int _contextIndex ;
+
+ protected long _totalBytes;
+ protected long _readBytes;
+ protected long _readCounts;
+
+ protected String _attemptId = null;
+
+ private static long _limit = 100; /*for testing only*/
+
+ public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ super(job, (FileSplit) split);
+
+ _props = KafkaETLUtils.getPropsFromJob(job);
+ _contextList = new ArrayList<KafkaETLContext>();
+ _job = job;
+ _reporter = reporter;
+ _contextIndex = -1;
+ _mos = new MultipleOutputs(job);
+ try {
+ _limit = _props.getInt("kafka.request.limit", -1);
+
+ /*get attemp id*/
+ String taskId = _job.get("mapred.task.id");
+ if (taskId == null) {
+ throw new IllegalArgumentException(
+ "Configutaion does not contain the property mapred.task.id");
+ }
+ String[] parts = taskId.split("_");
+ if ( parts.length != 6 || !parts[0].equals("attempt")
+ || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
+ throw new IllegalArgumentException(
+ "TaskAttemptId string : " + taskId + " is not properly formed");
+ }
+ _attemptId = parts[4]+parts[3];
+ }catch (Exception e) {
+ throw new IOException (e);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+
+ /* now record some stats */
+ for (KafkaETLContext context: _contextList) {
+ context.output(_attemptId);
+ context.close();
+ }
+
+ _mos.close();
+ }
+
+ @Override
+ public KafkaETLKey createKey() {
+ return super.createKey();
+ }
+
+ @Override
+ public BytesWritable createValue() {
+ return super.createValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (_totalBytes == 0) return 0f;
+
+ if (_contextIndex >= _contextList.size()) return 1f;
+
+ if (_limit < 0) {
+ double p = ( _readBytes + getContext().getReadBytes() ) / ((double) _totalBytes);
+ return (float)p;
+ }
+ else {
+ double p = (_readCounts + getContext().getCount()) / ((double)_limit * _contextList.size());
+ return (float)p;
+ }
+ }
+
+ @Override
+ public synchronized boolean next(KafkaETLKey key, BytesWritable value)
+ throws IOException {
+ try{
+ if (_contextIndex < 0) { /* first call, get all requests */
+ System.out.println("RecordReader.next init()");
+ _totalBytes = 0;
+
+ while ( super.next(key, value)) {
+ String input = new String(value.getBytes(), "UTF-8");
+ int index = _contextList.size();
+ KafkaETLContext context = new KafkaETLContext(
+ _job, _props, _reporter, _mos, index, input);
+ _contextList.add(context);
+ _totalBytes += context.getTotalBytes();
+ }
+ System.out.println("Number of requests=" + _contextList.size());
+
+ _readBytes = 0;
+ _readCounts = 0;
+ _contextIndex = 0;
+ }
+
+ while (_contextIndex < _contextList.size()) {
+
+ KafkaETLContext currContext = getContext();
+
+ while (currContext.hasMore() &&
+ (_limit < 0 || currContext.getCount() < _limit)) {
+
+ if (currContext.getNext(key, value)) {
+ //System.out.println("RecordReader.next get (key,value)");
+ return true;
+ }
+ else {
+ //System.out.println("RecordReader.next fetch more");
+ currContext.fetchMore();
+ }
+ }
+
+ _readBytes += currContext.getReadBytes();
+ _readCounts += currContext.getCount();
+ _contextIndex ++;
+ System.out.println("RecordReader.next will get from request " + _contextIndex);
+ }
+ }catch (Exception e) {
+ throw new IOException (e);
+ }
+ return false;
+ }
+
+ protected KafkaETLContext getContext() throws IOException{
+ if (_contextIndex >= _contextList.size())
+ throw new IOException ("context index " + _contextIndex + " is out of bound "
+ + _contextList.size());
+ return _contextList.get(_contextIndex);
+ }
+
+
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,112 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class KafkaETLRequest {
+ public static long DEFAULT_OFFSET = -1;
+ public static String DELIM = "\t";
+
+ String _topic;
+ URI _uri;
+ int _partition;
+ long _offset = DEFAULT_OFFSET;
+
+ public KafkaETLRequest() {
+
+ }
+
+ public KafkaETLRequest(String input) throws IOException {
+ //System.out.println("Init request from " + input);
+ String[] pieces = input.trim().split(DELIM);
+ if (pieces.length != 4)
+ throw new IOException( input +
+ " : input must be in the form 'url" + DELIM +
+ "topic" + DELIM +"partition" + DELIM +"offset'");
+
+ try {
+ _uri = new URI (pieces[0]);
+ }catch (java.net.URISyntaxException e) {
+ throw new IOException (e);
+ }
+ _topic = pieces[1];
+ _partition = Integer.valueOf(pieces[2]);
+ _offset = Long.valueOf(pieces[3]);
+ }
+
+ public KafkaETLRequest(String node, String topic, String partition, String offset,
+ Map<String, String> nodes) throws IOException {
+
+ Integer nodeId = Integer.parseInt(node);
+ String uri = nodes.get(nodeId.toString());
+ if (uri == null) throw new IOException ("Cannot form node for id " + nodeId);
+
+ try {
+ _uri = new URI (uri);
+ }catch (java.net.URISyntaxException e) {
+ throw new IOException (e);
+ }
+ _topic = topic;
+ _partition = Integer.valueOf(partition);
+ _offset = Long.valueOf(offset);
+ }
+
+ public KafkaETLRequest(String topic, String uri, int partition) throws URISyntaxException {
+ _topic = topic;
+ _uri = new URI(uri);
+ _partition = partition;
+ }
+
+ public void setDefaultOffset() {
+ _offset = DEFAULT_OFFSET;
+ }
+
+ public void setOffset(long offset) {
+ _offset = offset;
+ }
+
+ public String getTopic() { return _topic;}
+ public URI getURI () { return _uri;}
+ public int getPartition() { return _partition;}
+
+ public long getOffset() { return _offset;}
+
+ public boolean isValidOffset() {
+ return _offset >= 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (! (o instanceof KafkaETLRequest))
+ return false;
+
+ KafkaETLRequest r = (KafkaETLRequest) o;
+ return this._topic.equals(r._topic) ||
+ this._uri.equals(r._uri) ||
+ this._partition == r._partition;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString(0).hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return toString(_offset);
+ }
+
+
+ public String toString (long offset) {
+
+ return
+ _uri + DELIM +
+ _topic + DELIM +
+ _partition + DELIM +
+ offset;
+ }
+
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+
+public class KafkaETLUtils {
+
+ public static PathFilter PATH_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith("_")
+ && !path.getName().startsWith(".");
+ }
+ };
+
+
+ public static Path getLastPath(Path path, FileSystem fs) throws IOException {
+
+ FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+
+ if (statuses.length == 0) {
+ return path;
+ } else {
+ Arrays.sort(statuses);
+ return statuses[statuses.length - 1].getPath();
+ }
+ }
+
+ public static String getFileName(Path path) throws IOException {
+ String fullname = path.toUri().toString();
+ String[] parts = fullname.split(Path.SEPARATOR);
+ if (parts.length < 1)
+ throw new IOException("Invalid path " + fullname);
+ return parts[parts.length - 1];
+ }
+
+ public static List<String> readText(FileSystem fs, String inputFile)
+ throws IOException, FileNotFoundException {
+ Path path = new Path(inputFile);
+ return readText(fs, path);
+ }
+
+ public static List<String> readText(FileSystem fs, Path path)
+ throws IOException, FileNotFoundException {
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException("File " + path + " doesn't exist!");
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(
+ fs.open(path)));
+ List<String> buf = new ArrayList<String>();
+ String line = null;
+
+ while ((line = in.readLine()) != null) {
+ if (line.trim().length() > 0)
+ buf.add(new String(line.trim()));
+ }
+ in.close();
+ return buf;
+ }
+
+ public static void writeText(FileSystem fs, Path outPath, String content)
+ throws IOException {
+ long timestamp = System.currentTimeMillis();
+ String localFile = "/tmp/KafkaETL_tmp_" + timestamp;
+ PrintWriter writer = new PrintWriter(new FileWriter(localFile));
+ writer.println(content);
+ writer.close();
+
+ Path src = new Path(localFile);
+ fs.moveFromLocalFile(src, outPath);
+ }
+
+ public static Props getPropsFromJob(Configuration conf) {
+ String propsString = conf.get("kafka.etl.props");
+ if (propsString == null)
+ throw new UndefinedPropertyException(
+ "The required property kafka.etl.props was not found in the Configuration.");
+ try {
+ ByteArrayInputStream input = new ByteArrayInputStream(
+ propsString.getBytes("UTF-8"));
+ Properties properties = new Properties();
+ properties.load(input);
+ return new Props(properties);
+ } catch (IOException e) {
+ throw new RuntimeException("This is not possible!", e);
+ }
+ }
+
+ public static void setPropsInJob(Configuration conf, Props props)
+ {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ try
+ {
+ props.store(output);
+ conf.set("kafka.etl.props", new String(output.toByteArray(), "UTF-8"));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("This is not possible!", e);
+ }
+ }
+
+ public static Props readProps(String file) throws IOException {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) {
+ InputStream input = fs.open(path);
+ try {
+ // wrap it up in another layer so that the user can override
+ // properties
+ Props p = new Props(input);
+ return new Props(p);
+ } finally {
+ input.close();
+ }
+ } else {
+ return new Props();
+ }
+ }
+
+ public static String findContainingJar(
+ @SuppressWarnings("rawtypes") Class my_class, ClassLoader loader) {
+ String class_file = my_class.getName().replaceAll("\\.", "/")
+ + ".class";
+ return findContainingJar(class_file, loader);
+ }
+
+ public static String findContainingJar(String fileName, ClassLoader loader) {
+ try {
+ for (@SuppressWarnings("rawtypes")
+ Enumeration itr = loader.getResources(fileName); itr
+ .hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ // logger.info("findContainingJar finds url:" + url);
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ public static byte[] getBytes(BytesWritable val) {
+
+ byte[] buffer = val.getBytes();
+
+ /* FIXME: remove the following part once the below gira is fixed
+ * https://issues.apache.org/jira/browse/HADOOP-6298
+ */
+ long len = val.getLength();
+ byte [] bytes = buffer;
+ if (len < buffer.length) {
+ bytes = new byte[(int) len];
+ System.arraycopy(buffer, 0, bytes, 0, (int)len);
+ }
+
+ return bytes;
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.log4j.Logger;
+
+public class Props extends Properties {
+
+ private static final long serialVersionUID = 1L;
+ private static Logger logger = Logger.getLogger(Props.class);
+
+ /**
+ * default constructor
+ */
+ public Props() {
+ super();
+ }
+
+ /**
+ * copy constructor
+ * @param props
+ */
+ public Props(Props props) {
+ if (props != null) {
+ this.put(props);
+ }
+ }
+
+ /**
+ * construct props from a list of files
+ * @param files paths of files
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ public Props(String... files) throws FileNotFoundException, IOException {
+ this(Arrays.asList(files));
+ }
+
+ /**
+ * construct props from a list of files
+ * @param files paths of files
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ public Props(List<String> files) throws FileNotFoundException, IOException {
+
+ for (int i = 0; i < files.size(); i++) {
+ InputStream input = new BufferedInputStream(new FileInputStream(
+ new File(files.get(i)).getAbsolutePath()));
+ super.load(input);
+ input.close();
+ }
+ }
+
+ /**
+ * construct props from a list of input streams
+ * @param inputStreams
+ * @throws IOException
+ */
+ public Props(InputStream... inputStreams) throws IOException {
+ for (InputStream stream : inputStreams)
+ super.load(stream);
+ }
+
+ /**
+ * construct props from a list of maps
+ * @param props
+ */
+ public Props(Map<String, String>... props) {
+ for (int i = props.length - 1; i >= 0; i--)
+ super.putAll(props[i]);
+ }
+
+ /**
+ * construct props from a list of Properties
+ * @param properties
+ */
+ public Props(Properties... properties) {
+ for (int i = properties.length - 1; i >= 0; i--){
+ this.put(properties[i]);
+ }
+ }
+
+ /**
+ * build props from a list of strings and interprate them as
+ * key, value, key, value,....
+ *
+ * @param args
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public static Props of(String... args) {
+ if (args.length % 2 != 0)
+ throw new IllegalArgumentException(
+ "Must have an equal number of keys and values.");
+ Map<String, String> vals = new HashMap<String, String>(args.length / 2);
+ for (int i = 0; i < args.length; i += 2)
+ vals.put(args[i], args[i + 1]);
+ return new Props(vals);
+ }
+
+ /**
+ * Put the given Properties into the Props.
+ *
+ * @param properties
+ * The properties to put
+ *
+ */
+ public void put(Properties properties) {
+ for (String propName : properties.stringPropertyNames()) {
+ super.put(propName, properties.getProperty(propName));
+ }
+ }
+
+ /**
+ * get property of "key" and split the value by " ,"
+ * @param key
+ * @return
+ */
+ public List<String> getStringList(String key) {
+ return getStringList(key, "\\s*,\\s*");
+ }
+
+ /**
+ * get property of "key" and split the value by "sep"
+ * @param key
+ * @param sep
+ * @return
+ */
+ public List<String> getStringList(String key, String sep) {
+ String val = super.getProperty(key);
+ if (val == null || val.trim().length() == 0)
+ return Collections.emptyList();
+
+ if (containsKey(key))
+ return Arrays.asList(val.split(sep));
+ else
+ throw new UndefinedPropertyException("Missing required property '"
+ + key + "'");
+ }
+
+ /**
+ * get string list with default value. default delimiter is ","
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public List<String> getStringList(String key, List<String> defaultValue) {
+ if (containsKey(key))
+ return getStringList(key);
+ else
+ return defaultValue;
+ }
+
+ /**
+ * get string list with default value
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public List<String> getStringList(String key, List<String> defaultValue,
+ String sep) {
+ if (containsKey(key))
+ return getStringList(key, sep);
+ else
+ return defaultValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T getValue(String key, T defaultValue)
+ throws Exception {
+
+ if (containsKey(key)) {
+ Object value = super.get(key);
+ if (value.getClass().isInstance(defaultValue)) {
+ return (T)value;
+ } else if (value instanceof String) {
+ // call constructor(String) to initialize it
+ @SuppressWarnings("rawtypes")
+ Constructor ct = defaultValue.getClass().getConstructor(String.class);
+ String v = ((String)value).trim();
+ Object ret = ct.newInstance(v);
+ return (T) ret;
+ }
+ else throw new UndefinedPropertyException ("Property " + key +
+ ": cannot convert value of " + value.getClass().getName() +
+ " to " + defaultValue.getClass().getName());
+ }
+ else {
+ return defaultValue;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T getValue(String key, Class<T> mclass)
+ throws Exception {
+
+ if (containsKey(key)) {
+ Object value = super.get(key);
+ if (value.getClass().equals(mclass)) {
+ return (T)value;
+ } else if (value instanceof String) {
+ // call constructor(String) to initialize it
+ @SuppressWarnings("rawtypes")
+ Constructor ct = mclass.getConstructor(String.class);
+ String v = ((String)value).trim();
+ Object ret = ct.newInstance(v);
+ return (T) ret;
+ }
+ else throw new UndefinedPropertyException ("Property " + key +
+ ": cannot convert value of " + value.getClass().getName() +
+ " to " + mclass.getClass().getName());
+ }
+ else {
+ throw new UndefinedPropertyException ("Missing required property '"
+ + key + "'");
+ }
+ }
+
+ /**
+ * get boolean value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type boolean or string
+ */
+ public Boolean getBoolean(String key, Boolean defaultValue)
+ throws Exception {
+ return getValue (key, defaultValue);
+ }
+
+ /**
+ * get boolean value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type boolean or string or
+ * if value doesn't exist
+ */
+ public Boolean getBoolean(String key) throws Exception {
+ return getValue (key, Boolean.class);
+ }
+
+ /**
+ * get long value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type long or string
+ */
+ public Long getLong(String name, Long defaultValue)
+ throws Exception {
+ return getValue(name, defaultValue);
+ }
+
+ /**
+ * get long value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type long or string or
+ * if value doesn't exist
+ */
+ public Long getLong(String name) throws Exception {
+ return getValue (name, Long.class);
+ }
+
+ /**
+ * get integer value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type integer or string
+ */
+ public Integer getInt(String name, Integer defaultValue)
+ throws Exception {
+ return getValue(name, defaultValue);
+ }
+
+ /**
+ * get integer value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type integer or string or
+ * if value doesn't exist
+ */
+ public Integer getInt(String name) throws Exception {
+ return getValue (name, Integer.class);
+ }
+
+ /**
+ * get double value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type double or string
+ */
+ public Double getDouble(String name, double defaultValue)
+ throws Exception {
+ return getValue(name, defaultValue);
+ }
+
+ /**
+ * get double value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type double or string or
+ * if value doesn't exist
+ */
+ public double getDouble(String name) throws Exception {
+ return getValue(name, Double.class);
+ }
+
+ /**
+ * get URI value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type URI or string
+ */
+ public URI getUri(String name, URI defaultValue) throws Exception {
+ return getValue(name, defaultValue);
+ }
+
+ /**
+ * get URI value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type URI or string
+ */
+ public URI getUri(String name, String defaultValue)
+ throws Exception {
+ URI defaultV = new URI(defaultValue);
+ return getValue(name, defaultV);
+ }
+
+ /**
+ * get URI value
+ * @param key
+ * @param defaultValue
+ * @return
+ * @throws Exception if value is not of type URI or string or
+ * if value doesn't exist
+ */
+ public URI getUri(String name) throws Exception {
+ return getValue(name, URI.class);
+ }
+
+ /**
+ * compare two props
+ * @param p
+ * @return
+ */
+ public boolean equalsProps(Props p) {
+ if (p == null) {
+ return false;
+ }
+
+ final Set<String> myKeySet = getKeySet();
+ for (String s : myKeySet) {
+ if (!get(s).equals(p.get(s))) {
+ return false;
+ }
+ }
+
+ return myKeySet.size() == p.getKeySet().size();
+ }
+
+
+ /**
+ * Get a map of all properties by string prefix
+ *
+ * @param prefix
+ * The string prefix
+ */
+ public Map<String, String> getMapByPrefix(String prefix) {
+ Map<String, String> values = new HashMap<String, String>();
+
+ for (String key : super.stringPropertyNames()) {
+ if (key.startsWith(prefix)) {
+ values.put(key.substring(prefix.length()), super.getProperty(key));
+ }
+ }
+ return values;
+ }
+
+ /**
+ * Store all properties
+ *
+ * @param out The stream to write to
+ * @throws IOException If there is an error writing
+ */
+ public void store(OutputStream out) throws IOException {
+ super.store(out, null);
+ }
+
+ /**
+ * get all property names
+ * @return
+ */
+ public Set<String> getKeySet() {
+ return super.stringPropertyNames();
+ }
+
+ /**
+ * log properties
+ * @param comment
+ */
+ public void logProperties(String comment) {
+ logger.info(comment);
+
+ for (String key : getKeySet()) {
+ logger.info(" key=" + key + " value=" + get(key));
+ }
+ }
+
+ /**
+ * clone a Props
+ * @param p
+ * @return
+ */
+ public static Props clone(Props p) {
+ return new Props(p);
+ }
+
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+public class UndefinedPropertyException extends RuntimeException {
+
+ private static final long serialVersionUID = 1;
+
+ public UndefinedPropertyException(String message) {
+ super(message);
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import kafka.message.NoCompressionCodec;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+
+import kafka.etl.KafkaETLKey;
+import kafka.etl.KafkaETLRequest;
+import kafka.etl.KafkaETLUtils;
+import kafka.etl.Props;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+/**
+ * Use this class to produce test events to Kafka server. Each event contains a
+ * random timestamp in text format.
+ */
+@SuppressWarnings("deprecation")
+public class DataGenerator {
+
+ protected final static Random RANDOM = new Random(
+ System.currentTimeMillis());
+
+ protected Props _props;
+ protected SyncProducer _producer = null;
+ protected URI _uri = null;
+ protected String _topic;
+ protected int _count;
+ protected String _offsetsDir;
+ protected final int TCP_BUFFER_SIZE = 300 * 1000;
+ protected final int CONNECT_TIMEOUT = 20000; // ms
+ protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms
+
+ public DataGenerator(String id, Props props) throws Exception {
+ _props = props;
+ _topic = props.getProperty("kafka.etl.topic");
+ System.out.println("topics=" + _topic);
+ _count = props.getInt("event.count");
+
+ _offsetsDir = _props.getProperty("input");
+
+ // initialize kafka producer to generate count events
+ String serverUri = _props.getProperty("kafka.server.uri");
+ _uri = new URI (serverUri);
+
+ System.out.println("server uri:" + _uri.toString());
+ Properties producerProps = new Properties();
+ producerProps.put("host", _uri.getHost());
+ producerProps.put("port", String.valueOf(_uri.getPort()));
+ producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
+ producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
+ producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
+ _producer = new SyncProducer(new SyncProducerConfig(producerProps));
+
+ }
+
+ public void run() throws Exception {
+
+ List<Message> list = new ArrayList<Message>();
+ for (int i = 0; i < _count; i++) {
+ Long timestamp = RANDOM.nextLong();
+ if (timestamp < 0) timestamp = -timestamp;
+ byte[] bytes = timestamp.toString().getBytes("UTF8");
+ Message message = new Message(bytes);
+ list.add(message);
+ }
+ // send events
+ System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
+ _producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
+
+ // close the producer
+ _producer.close();
+
+ // generate offset files
+ generateOffsets();
+ }
+
+ protected void generateOffsets() throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi"));
+ conf.setCompressMapOutput(false);
+ Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat");
+ FileSystem fs = outPath.getFileSystem(conf);
+ if (fs.exists(outPath)) fs.delete(outPath);
+
+ KafkaETLRequest request =
+ new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
+
+ System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
+ byte[] bytes = request.toString().getBytes("UTF-8");
+ KafkaETLKey dummyKey = new KafkaETLKey();
+ SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath,
+ KafkaETLKey.class, BytesWritable.class);
+ writer.append(dummyKey, new BytesWritable(bytes));
+ writer.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length < 1)
+ throw new Exception("Usage: - config_file");
+
+ Props props = new Props(args[0]);
+ DataGenerator job = new DataGenerator("DataGenerator", props);
+ job.run();
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl.impl;
+
+import kafka.etl.KafkaETLInputFormat;
+import kafka.etl.KafkaETLJob;
+import kafka.etl.Props;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * This is a simple Kafka ETL job which pull text events generated by
+ * DataGenerator and store them in hdfs
+ */
+@SuppressWarnings("deprecation")
+public class SimpleKafkaETLJob {
+
+ protected String _name;
+ protected Props _props;
+ protected String _input;
+ protected String _output;
+ protected String _topic;
+
+ public SimpleKafkaETLJob(String name, Props props) throws Exception {
+ _name = name;
+ _props = props;
+
+ _input = _props.getProperty("input");
+ _output = _props.getProperty("output");
+
+ _topic = props.getProperty("kafka.etl.topic");
+ }
+
+
+ protected JobConf createJobConf() throws Exception {
+ JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", _topic, _props, getClass());
+
+ jobConf.setMapperClass(SimpleKafkaETLMapper.class);
+ KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input));
+
+ jobConf.setOutputKeyClass(LongWritable.class);
+ jobConf.setOutputValueClass(Text.class);
+ jobConf.setOutputFormat(TextOutputFormat.class);
+ TextOutputFormat.setCompressOutput(jobConf, false);
+ Path output = new Path(_output);
+ FileSystem fs = output.getFileSystem(jobConf);
+ if (fs.exists(output)) fs.delete(output);
+ TextOutputFormat.setOutputPath(jobConf, output);
+
+ jobConf.setNumReduceTasks(0);
+ return jobConf;
+ }
+
+ public void execute () throws Exception {
+ JobConf conf = createJobConf();
+ RunningJob runningJob = new JobClient(conf).submitJob(conf);
+ String id = runningJob.getJobID();
+ System.out.println("Hadoop job id=" + id);
+ runningJob.waitForCompletion();
+
+ if (!runningJob.isSuccessful())
+ throw new Exception("Hadoop ETL job failed! Please check status on http://"
+ + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id);
+ }
+
+ /**
+ * for testing only
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ if (args.length < 1)
+ throw new Exception("Usage: - config_file");
+
+ Props props = new Props(args[0]);
+ SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob",
+ props);
+ job.execute();
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.etl.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import kafka.etl.KafkaETLKey;
+import kafka.etl.KafkaETLUtils;
+import kafka.message.Message;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Simple implementation of KafkaETLMapper. It assumes that
+ * input data are text timestamp (long).
+ */
+@SuppressWarnings("deprecation")
+public class SimpleKafkaETLMapper implements
+Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
+
+ protected long _count = 0;
+
+ protected Text getData(Message message) throws IOException {
+ ByteBuffer buf = message.payload();
+
+ byte[] array = new byte[buf.limit()];
+ buf.get(array);
+
+ Text text = new Text( new String(array, "UTF8"));
+ return text;
+ }
+
+
+ @Override
+ public void map(KafkaETLKey key, BytesWritable val,
+ OutputCollector<LongWritable, Text> collector,
+ Reporter reporter) throws IOException {
+
+
+ byte[] bytes = KafkaETLUtils.getBytes(val);
+
+ //check the checksum of message
+ Message message = new Message(bytes);
+ long checksum = key.getChecksum();
+ if (checksum != message.checksum())
+ throw new IOException ("Invalid message checksum "
+ + message.checksum() + ". Expected " + key + ".");
+ Text data = getData (message);
+ _count ++;
+
+ collector.collect(new LongWritable (_count), data);
+
+ }
+
+
+ @Override
+ public void configure(JobConf arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,28 @@
+# name of test topic
+kafka.etl.topic=SimpleTestEvent
+
+# hdfs location of jars
+hdfs.default.classpath.dir=/tmp/kafka/lib
+
+# number of test events to be generated
+event.count=1000
+
+# hadoop id and group
+hadoop.job.ugi=kafka,hadoop
+
+# kafka server uri
+kafka.server.uri=tcp://localhost:9092
+
+# hdfs location of input directory
+input=/tmp/kafka/data
+
+# hdfs location of output directory
+output=/tmp/kafka/output
+
+# limit the number of events to be fetched;
+# value -1 means no limitation
+kafka.request.limit=-1
+
+# kafka parameters
+client.buffer.size=1048576
+client.so.timeout=60000
Added: incubator/kafka/trunk/contrib/hadoop-producer/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/README.md (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/README.md Mon Aug 1 23:41:24 2011
@@ -0,0 +1,134 @@
+Hadoop to Kafka Bridge
+======================
+
+What is it?
+-----------
+
+The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
+are two possible mechanisms, varying from easy to difficult: writing a Pig
+script and writing messages in Avro format, or rolling your own job using the
+Kafka `OutputFormat`.
+
+Note that there are no write-once semantics: any client of the data must handle
+messages in an idempotent manner. That is, because of node failures and
+Hadoop's failure recovery, it's possible that the same message is published
+multiple times in the same push.
+
+How do I use it?
+----------------
+
+With this bridge, Kafka topics are URIs and are specified as
+`kafka://<kafka-server>/<kafka-topic>`.
+
+### Pig ###
+
+Pig bridge writes data in binary Avro format with one message created per input
+row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
+with the Avro schema as its first argument. You'll need to register the
+appropriate Kafka JARs. Here is what an example Pig script looks like:
+
+ REGISTER hadoop-kafka-bridge-0.5.2.jar;
+ REGISTER avro-1.4.0.jar;
+ REGISTER piggybank.jar;
+ REGISTER kafka-0.5.2.jar;
+ REGISTER jackson-core-asl-1.5.5.jar;
+ REGISTER jackson-mapper-asl-1.5.5.jar;
+ REGISTER scala-library.jar;
+
+ member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
+ names = FOREACH member_info GENERATE name;
+ STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+
+That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
+from Pig's data model to the specified Avro schema.
+
+Further, multi-store is possible with KafkaStorage, so you can easily write to
+multiple topics and brokers in the same job:
+
+ SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
+ STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
+ STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema');
+
+### KafkaOutputFormat ###
+
+KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It
+uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
+BytesWritable). This is a lower-level method of publishing data, as it allows
+you to precisely control output.
+
+Here is an example that publishes some input text. With KafkaOutputFormat, the
+key is a NullWritable and is ignored; only values are published. Speculative
+execution is turned off by the OutputFormat.
+
+ import kafka.bridge.hadoop.KafkaOutputFormat;
+
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Job;
+ import org.apache.hadoop.mapreduce.Mapper;
+ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+ import java.io.IOException;
+
+ public class TextPublisher
+ {
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 2) {
+ System.err.println("usage: <input path> <kafka output url>");
+ return;
+ }
+
+ Job job = new Job();
+
+ job.setJarByClass(TextPublisher.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(BytesWritable.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(KafkaOutputFormat.class);
+
+ job.setMapperClass(TheMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ if (!job.waitForCompletion(true)) {
+ throw new RuntimeException("Job failed!");
+ }
+ }
+
+ public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+ {
+ @Override
+ protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+ {
+ context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+ }
+ }
+ }
+
+What can I tune?
+----------------
+
+Normally, you needn't change any of these parameters:
+
+* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka
+ producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
+* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka
+ producer docs). Default is 30*1000 (30s).
+* kafka.output.reconnect_timeout: Milliseconds to wait until attempting
+ reconnection (see Kafka producer docs). Default is 1000 (1s).
+* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer
+ docs). Default is 64*1024 (64KB).
+* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
+ docs). Default is 1024*1024 (1MB).
+
+For easier debugging, the above values as well as the Kafka URI
+(kafka.output.url), the output server (kafka.output.server), the topic
+(kafka.output.topic), and the schema (kafka.output.schema) are injected into
+the job's configuration.
+
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.examples;
+
+import kafka.bridge.hadoop.KafkaOutputFormat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+
+public class TextPublisher
+{
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 2) {
+ System.err.println("usage: <input path> <kafka output url>");
+ return;
+ }
+
+ Job job = new Job();
+
+ job.setJarByClass(TextPublisher.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(BytesWritable.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(KafkaOutputFormat.class);
+
+ job.setMapperClass(TheMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ if (!job.waitForCompletion(true)) {
+ throw new RuntimeException("Job failed!");
+ }
+ }
+
+ public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+ {
+ @Override
+ protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+ {
+ context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+ }
+ }
+}
+
Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.hadoop;
+
+import java.util.Properties;
+
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
+{
+ public static final String KAFKA_URL = "kafka.output.url";
+ public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
+ public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
+ public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
+ public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
+ public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+
+ public KafkaOutputFormat()
+ {
+ super();
+ }
+
+ public static void setOutputPath(Job job, Path outputUrl)
+ {
+ job.getConfiguration().set(KafkaOutputFormat.KAFKA_URL, outputUrl.toString());
+
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
+ job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
+ }
+
+ public static Path getOutputPath(JobContext job)
+ {
+ String name = job.getConfiguration().get(KafkaOutputFormat.KAFKA_URL);
+ return name == null ? null : new Path(name);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException
+ {
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ {
+ // Is there a programmatic way to get the temp dir? I see it hardcoded everywhere in Hadoop, Hive, and Pig.
+ return new FileOutputCommitter(new Path("/tmp/" + taskAttemptContext.getTaskAttemptID().getJobID().toString()), taskAttemptContext);
+ }
+
+ @Override
+ public RecordWriter<NullWritable, W> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ Path outputPath = getOutputPath(context);
+ if (outputPath == null)
+ throw new IllegalArgumentException("no kafka output url specified");
+ URI uri = outputPath.toUri();
+ Configuration job = context.getConfiguration();
+
+ final String topic = uri.getPath().substring(1); // ignore the initial '/' in the path
+
+ final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
+ final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
+ final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
+ final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
+ final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
+
+ job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort()));
+ job.set("kafka.output.topic", topic);
+ job.setInt("kafka.output.queue_size", queueSize);
+ job.setInt("kafka.output.connect_timeout", timeout);
+ job.setInt("kafka.output.reconnect_interval", interval);
+ job.setInt("kafka.output.bufsize", bufSize);
+ job.setInt("kafka.output.max_msgsize", maxSize);
+
+ if (uri.getHost().isEmpty())
+ throw new IllegalArgumentException("missing kafka server");
+ if (uri.getPath().isEmpty())
+ throw new IllegalArgumentException("missing kafka topic");
+
+ Properties props = new Properties();
+ props.setProperty("host", uri.getHost());
+ props.setProperty("port", Integer.toString(uri.getPort()));
+ props.setProperty("buffer.size", Integer.toString(bufSize));
+ props.setProperty("connect.timeout.ms", Integer.toString(timeout));
+ props.setProperty("reconnect.interval", Integer.toString(interval));
+ props.setProperty("max.message.size", Integer.toString(maxSize));
+
+ SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
+ return new KafkaRecordWriter<W>(producer, topic, queueSize);
+ }
+}
+
Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.hadoop;
+
+import kafka.message.Message;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.producer.SyncProducer;
+
+import kafka.message.NoCompressionCodec;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
+{
+ protected SyncProducer producer;
+ protected String topic;
+
+ protected List<Message> msgList = new ArrayList<Message>();
+ protected int totalSize = 0;
+ protected int queueSize;
+
+ public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize)
+ {
+ this.producer = producer;
+ this.topic = topic;
+ this.queueSize = queueSize;
+ }
+
+ protected void sendMsgList()
+ {
+ if (msgList.size() > 0) {
+ ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList);
+ producer.send(topic, msgSet);
+ msgList.clear();
+ totalSize = 0;
+ }
+ }
+
+ @Override
+ public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
+ {
+ Message msg = new Message(value.getBytes());
+ msgList.add(msg);
+ totalSize += msg.size();
+
+ if (totalSize > queueSize)
+ sendMsgList();
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+ {
+ sendMsgList();
+ producer.close();
+ }
+}
Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.pig;
+
+import kafka.bridge.hadoop.KafkaOutputFormat;
+import kafka.bridge.hadoop.KafkaRecordWriter;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Encoder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
+import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class AvroKafkaStorage extends StoreFunc
+{
+ protected KafkaRecordWriter writer;
+ protected org.apache.avro.Schema avroSchema;
+ protected PigAvroDatumWriter datumWriter;
+ protected Encoder encoder;
+ protected ByteArrayOutputStream os;
+
+ public AvroKafkaStorage(String schema)
+ {
+ this.avroSchema = org.apache.avro.Schema.parse(schema);
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException
+ {
+ return new KafkaOutputFormat();
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return location;
+ }
+
+ @Override
+ public void setStoreLocation(String uri, Job job) throws IOException
+ {
+ KafkaOutputFormat.setOutputPath(job, new Path(uri));
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException
+ {
+ if (this.avroSchema == null)
+ throw new IllegalStateException("avroSchema shouldn't be null");
+
+ this.writer = (KafkaRecordWriter) writer;
+ this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
+ this.os = new ByteArrayOutputStream();
+ this.encoder = new BinaryEncoder(this.os);
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException
+ {
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ this.avroSchema = PigSchema2Avro.validateAndConvert(avroSchema, schema);
+ }
+
+ protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException
+ {
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException
+ {
+ os.reset();
+ writeEnvelope(os, this.encoder);
+ datumWriter.write(tuple, this.encoder);
+ this.encoder.flush();
+
+ try {
+ this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray()));
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import consumer.ConsumerConfig
+import org.apache.log4j.Logger
+import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
+import utils.Utils
+import org.apache.log4j.jmx.LoggerDynamicMBean
+
+object Kafka {
+ private val logger = Logger.getLogger(Kafka.getClass)
+
+ def main(args: Array[String]): Unit = {
+ val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
+ Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
+
+ if(args.length != 1 && args.length != 2) {
+ println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties")
+ System.exit(1)
+ }
+
+ try {
+ var kafkaServerStartble: KafkaServerStartable = null
+ val props = Utils.loadProps(args(0))
+ val serverConfig = new KafkaConfig(props)
+ if (args.length == 2) {
+ val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
+ kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig)
+ }
+ else
+ kafkaServerStartble = new KafkaServerStartable(serverConfig)
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run() = {
+ kafkaServerStartble.shutdown
+ kafkaServerStartble.awaitShutdown
+ }
+ });
+
+ kafkaServerStartble.startup
+ kafkaServerStartble.awaitShutdown
+ }
+ catch {
+ case e => logger.fatal(e)
+ }
+ System.exit(0)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.network._
+import kafka.utils._
+
+object FetchRequest {
+
+ def readFrom(buffer: ByteBuffer): FetchRequest = {
+ val topic = Utils.readShortString(buffer, "UTF-8")
+ val partition = buffer.getInt()
+ val offset = buffer.getLong()
+ val size = buffer.getInt()
+ new FetchRequest(topic, partition, offset, size)
+ }
+}
+
+class FetchRequest(val topic: String,
+ val partition: Int,
+ val offset: Long,
+ val maxSize: Int) extends Request(RequestKeys.Fetch) {
+
+ def writeTo(buffer: ByteBuffer) {
+ Utils.writeShortString(buffer, topic, "UTF-8")
+ buffer.putInt(partition)
+ buffer.putLong(offset)
+ buffer.putInt(maxSize)
+ }
+
+ def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
+
+ override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
+ " maxSize:" + maxSize + ")"
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.network._
+import kafka.utils._
+import kafka.api._
+
+object MultiFetchRequest {
+ def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
+ val count = buffer.getShort
+ val fetches = new Array[FetchRequest](count)
+ for(i <- 0 until fetches.length)
+ fetches(i) = FetchRequest.readFrom(buffer)
+ new MultiFetchRequest(fetches)
+ }
+}
+
+class MultiFetchRequest(val fetches: Array[FetchRequest]) extends Request(RequestKeys.MultiFetch) {
+ def writeTo(buffer: ByteBuffer) {
+ if(fetches.length > Short.MaxValue)
+ throw new IllegalArgumentException("Number of requests in MultiFetchRequest exceeds " + Short.MaxValue + ".")
+ buffer.putShort(fetches.length.toShort)
+ for(fetch <- fetches)
+ fetch.writeTo(buffer)
+ }
+
+ def sizeInBytes: Int = {
+ var size = 2
+ for(fetch <- fetches)
+ size += fetch.sizeInBytes
+ size
+ }
+
+
+ override def toString(): String = {
+ val buffer = new StringBuffer
+ for(fetch <- fetches) {
+ buffer.append(fetch.toString)
+ buffer.append(",")
+ }
+ buffer.toString
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import collection.mutable
+import kafka.utils.IteratorTemplate
+import kafka.message._
+
+class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] {
+ private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet]
+
+ for(i <- 0 until numSets) {
+ val size = buffer.getInt()
+ val errorCode: Int = buffer.getShort()
+ val copy = buffer.slice()
+ val payloadSize = size - 2
+ copy.limit(payloadSize)
+ buffer.position(buffer.position + payloadSize)
+ messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode)
+ }
+
+ def iterator : Iterator[ByteBufferMessageSet] = {
+ new IteratorTemplate[ByteBufferMessageSet] {
+ val iter = messageSets.iterator
+
+ override def makeNext(): ByteBufferMessageSet = {
+ if(iter.hasNext)
+ iter.next
+ else
+ return allDone
+ }
+ }
+ }
+
+ override def toString() = this.messageSets.toString
+}