You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [14/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,156 @@
+package kafka.etl;
+
+import java.net.URI;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+@SuppressWarnings("deprecation")
+public class KafkaETLJob {
+    
+    public static final String HADOOP_PREFIX = "hadoop-conf.";
+    /**
+     * Create a job configuration
+     */
+    @SuppressWarnings("rawtypes")
+    public static JobConf createJobConf(String name, String topic, Props props, Class classobj) 
+    throws Exception {
+        JobConf conf = getJobConf(name, props, classobj);
+        
+        conf.set("topic", topic);
+        
+        // input format
+        conf.setInputFormat(KafkaETLInputFormat.class);
+
+        //turn off mapper speculative execution
+        conf.setMapSpeculativeExecution(false);
+        
+        // setup multiple outputs
+        MultipleOutputs.addMultiNamedOutput(conf, "offsets", SequenceFileOutputFormat.class, 
+                    KafkaETLKey.class, BytesWritable.class);
+
+
+        return conf;
+    }
+    
+    /**
+     * Helper function to initialize a job configuration
+     */
+    public static JobConf getJobConf(String name, Props props, Class classobj) throws Exception {
+        JobConf conf = new JobConf();
+        // set custom class loader with custom find resource strategy.
+
+        conf.setJobName(name);
+        String hadoop_ugi = props.getProperty("hadoop.job.ugi", null);
+        if (hadoop_ugi != null) {
+            conf.set("hadoop.job.ugi", hadoop_ugi);
+        }
+
+        if (props.getBoolean("is.local", false)) {
+            conf.set("mapred.job.tracker", "local");
+            conf.set("fs.default.name", "file:///");
+            conf.set("mapred.local.dir", "/tmp/map-red");
+
+            info("Running locally, no hadoop jar set.");
+        } else {
+            setClassLoaderAndJar(conf, classobj);
+            info("Setting hadoop jar file for class:" + classobj + "  to " + conf.getJar());
+            info("*************************************************************************");
+            info("          Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + ")           ");
+            info("*************************************************************************");
+        }
+
+        // set JVM options if present
+        if (props.containsKey("mapred.child.java.opts")) {
+            conf.set("mapred.child.java.opts", props.getProperty("mapred.child.java.opts"));
+            info("mapred.child.java.opts set to " + props.getProperty("mapred.child.java.opts"));
+        }
+
+        // Adds External jars to hadoop classpath
+        String externalJarList = props.getProperty("hadoop.external.jarFiles", null);
+        if (externalJarList != null) {
+            String[] jarFiles = externalJarList.split(",");
+            for (String jarFile : jarFiles) {
+                info("Adding extenral jar File:" + jarFile);
+                DistributedCache.addFileToClassPath(new Path(jarFile), conf);
+            }
+        }
+
+        // Adds distributed cache files
+        String cacheFileList = props.getProperty("hadoop.cache.files", null);
+        if (cacheFileList != null) {
+            String[] cacheFiles = cacheFileList.split(",");
+            for (String cacheFile : cacheFiles) {
+                info("Adding Distributed Cache File:" + cacheFile);
+                DistributedCache.addCacheFile(new URI(cacheFile), conf);
+            }
+        }
+
+        // Adds distributed cache files
+        String archiveFileList = props.getProperty("hadoop.cache.archives", null);
+        if (archiveFileList != null) {
+            String[] archiveFiles = archiveFileList.split(",");
+            for (String archiveFile : archiveFiles) {
+                info("Adding Distributed Cache Archive File:" + archiveFile);
+                DistributedCache.addCacheArchive(new URI(archiveFile), conf);
+            }
+        }
+
+        String hadoopCacheJarDir = props.getProperty("hdfs.default.classpath.dir", null);
+        if (hadoopCacheJarDir != null) {
+            FileSystem fs = FileSystem.get(conf);
+            if (fs != null) {
+                FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir));
+
+                if (status != null) {
+                    for (int i = 0; i < status.length; ++i) {
+                        if (!status[i].isDir()) {
+                            Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName());
+                            info("Adding Jar to Distributed Cache Archive File:" + path);
+
+                            DistributedCache.addFileToClassPath(path, conf);
+                        }
+                    }
+                } else {
+                    info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty.");
+                }
+            } else {
+                info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " filesystem doesn't exist");
+            }
+        }
+
+        // May want to add this to HadoopUtils, but will await refactoring
+        for (String key : props.stringPropertyNames()) {
+            String lowerCase = key.toLowerCase();
+            if (lowerCase.startsWith(HADOOP_PREFIX)) {
+                String newKey = key.substring(HADOOP_PREFIX.length());
+                conf.set(newKey, props.getProperty(key));
+            }
+        }
+
+        KafkaETLUtils.setPropsInJob(conf, props);
+        
+        return conf;
+    }
+
+    public static void info(String message) {
+        System.out.println(message);
+    }
+
+    public static void setClassLoaderAndJar(JobConf conf,
+            @SuppressWarnings("rawtypes") Class jobClass) {
+        conf.setClassLoader(Thread.currentThread().getContextClassLoader());
+        String jar = KafkaETLUtils.findContainingJar(jobClass, Thread
+                .currentThread().getContextClassLoader());
+        if (jar != null) {
+            conf.setJar(jar);
+        }
+    }
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,88 @@
+package kafka.etl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import kafka.etl.KafkaETLKey;
+
+public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
+
+    protected int _inputIndex;
+    protected long _offset;
+    protected long _checksum;
+    
+    /**
+     * dummy empty constructor
+     */
+    public KafkaETLKey() {
+        _inputIndex = 0;
+        _offset = 0;
+        _checksum = 0;
+    }
+    
+    public KafkaETLKey (int index, long offset) {
+        _inputIndex =  index;
+        _offset = offset;
+        _checksum = 0;
+    }
+    
+    public KafkaETLKey (int index, long offset, long checksum) {
+        _inputIndex =  index;
+        _offset = offset;
+        _checksum = checksum;
+    }
+    
+    public void set(int index, long offset, long checksum) {
+        _inputIndex = index;
+        _offset = offset;
+        _checksum = checksum;
+    }
+    
+    public int getIndex() {
+        return _inputIndex;
+    }
+    
+    public long getOffset() {
+        return _offset;
+    }
+    
+    public long getChecksum() {
+        return _checksum;
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        _inputIndex = in.readInt(); 
+        _offset = in.readLong();
+        _checksum = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(_inputIndex);
+        out.writeLong(_offset);
+        out.writeLong(_checksum);
+    }
+
+    @Override
+    public int compareTo(KafkaETLKey o) {
+        if (_inputIndex != o._inputIndex)
+            return _inputIndex = o._inputIndex;
+        else {
+            if  (_offset > o._offset) return 1;
+            else if (_offset < o._offset) return -1;
+            else {
+                if  (_checksum > o._checksum) return 1;
+                else if (_checksum < o._checksum) return -1;
+                else return 0;
+            }
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return "index=" + _inputIndex + " offset=" + _offset + " checksum=" + _checksum;
+    }
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,164 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+@SuppressWarnings({ "deprecation" })
+public class KafkaETLRecordReader 
+extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> {
+
+    /* max number of retries */
+    protected Props _props;   /*properties*/
+    protected JobConf _job;
+    protected Reporter _reporter ;
+    protected MultipleOutputs _mos;
+    protected List<KafkaETLContext> _contextList;
+    protected int _contextIndex ;
+    
+    protected long _totalBytes;
+    protected long _readBytes;
+    protected long _readCounts;
+    
+    protected String _attemptId = null;
+    
+    private static long _limit = 100; /*for testing only*/
+    
+    public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter) 
+    throws IOException {
+       super(job, (FileSplit) split);
+       
+       _props = KafkaETLUtils.getPropsFromJob(job);
+       _contextList = new ArrayList<KafkaETLContext>();
+       _job = job;
+       _reporter = reporter;
+       _contextIndex = -1;
+       _mos = new MultipleOutputs(job);
+       try {
+           _limit = _props.getInt("kafka.request.limit", -1);
+           
+           /*get attemp id*/
+           String taskId = _job.get("mapred.task.id");
+           if (taskId == null) {
+               throw new IllegalArgumentException(
+                                 "Configutaion does not contain the property mapred.task.id");
+           }
+           String[] parts = taskId.split("_");
+           if (    parts.length != 6 || !parts[0].equals("attempt") 
+                || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
+                   throw new IllegalArgumentException(
+                                 "TaskAttemptId string : " + taskId + " is not properly formed");
+           }
+          _attemptId = parts[4]+parts[3];
+       }catch (Exception e) {
+           throw new IOException (e);
+       }
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        super.close();
+        
+        /* now record some stats */
+        for (KafkaETLContext context: _contextList) {
+            context.output(_attemptId);
+            context.close();
+        }
+        
+        _mos.close();
+    }
+
+    @Override
+    public KafkaETLKey createKey() {
+        return super.createKey();
+    }
+
+    @Override
+    public BytesWritable createValue() {
+        return super.createValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        if (_totalBytes == 0) return 0f;
+        
+        if (_contextIndex >= _contextList.size()) return 1f;
+        
+        if (_limit < 0) {
+            double p = ( _readBytes + getContext().getReadBytes() ) / ((double) _totalBytes);
+            return (float)p;
+        }
+        else {
+            double p = (_readCounts + getContext().getCount()) / ((double)_limit * _contextList.size());
+            return (float)p;
+        }
+    }
+
+    @Override
+    public synchronized boolean next(KafkaETLKey key, BytesWritable value)
+                                    throws IOException {
+    try{
+        if (_contextIndex < 0) { /* first call, get all requests */
+            System.out.println("RecordReader.next init()");
+            _totalBytes = 0;
+            
+            while ( super.next(key, value)) {
+                String input = new String(value.getBytes(), "UTF-8");
+                int index = _contextList.size();
+                KafkaETLContext context = new KafkaETLContext(
+                                              _job, _props, _reporter, _mos, index, input);
+                _contextList.add(context);
+                _totalBytes += context.getTotalBytes();
+            }
+            System.out.println("Number of requests=" + _contextList.size());
+            
+            _readBytes = 0;
+            _readCounts = 0;
+            _contextIndex = 0;
+        }
+        
+        while (_contextIndex < _contextList.size()) {
+            
+            KafkaETLContext currContext = getContext();
+            
+            while (currContext.hasMore() && 
+                       (_limit < 0 || currContext.getCount() < _limit)) {
+                
+                if (currContext.getNext(key, value)) {
+                    //System.out.println("RecordReader.next get (key,value)");
+                    return true;
+                }
+                else {
+                    //System.out.println("RecordReader.next fetch more");
+                    currContext.fetchMore();
+                }
+            }
+            
+            _readBytes += currContext.getReadBytes();
+            _readCounts += currContext.getCount();
+            _contextIndex ++;
+            System.out.println("RecordReader.next will get from request " + _contextIndex);
+       }
+    }catch (Exception e) {
+        throw new IOException (e);
+    }
+    return false;
+    }
+    
+    protected KafkaETLContext getContext() throws IOException{
+        if (_contextIndex >= _contextList.size()) 
+            throw new IOException ("context index " + _contextIndex + " is out of bound " 
+                                            + _contextList.size());
+        return _contextList.get(_contextIndex);
+    }
+
+    
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,112 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class KafkaETLRequest {
+    public static long DEFAULT_OFFSET = -1;
+    public static String DELIM = "\t";
+    
+    String _topic;
+    URI _uri;
+    int _partition;
+    long _offset = DEFAULT_OFFSET;
+    
+    public KafkaETLRequest() {
+        
+    }
+    
+    public KafkaETLRequest(String input) throws IOException {
+        //System.out.println("Init request from " + input);
+        String[] pieces = input.trim().split(DELIM);
+        if (pieces.length != 4)
+            throw new IOException( input + 
+                                            " : input must be in the form 'url" + DELIM +
+                                            "topic" + DELIM +"partition" + DELIM +"offset'");
+
+        try {
+            _uri = new URI (pieces[0]); 
+        }catch (java.net.URISyntaxException e) {
+            throw new IOException (e);
+        }
+        _topic = pieces[1];
+        _partition = Integer.valueOf(pieces[2]);
+        _offset = Long.valueOf(pieces[3]);
+    }
+    
+    public KafkaETLRequest(String node, String topic, String partition, String offset, 
+                                    Map<String, String> nodes) throws IOException {
+
+        Integer nodeId = Integer.parseInt(node);
+        String uri = nodes.get(nodeId.toString());
+        if (uri == null) throw new IOException ("Cannot form node for id " + nodeId);
+        
+        try {
+            _uri = new URI (uri); 
+        }catch (java.net.URISyntaxException e) {
+            throw new IOException (e);
+        }
+        _topic = topic;
+        _partition = Integer.valueOf(partition);
+        _offset = Long.valueOf(offset);
+    }
+    
+    public KafkaETLRequest(String topic, String uri, int partition) throws URISyntaxException {
+        _topic = topic;
+        _uri = new URI(uri);
+        _partition = partition;
+    }
+    
+    public void setDefaultOffset() {
+        _offset = DEFAULT_OFFSET;
+    }
+    
+    public void setOffset(long offset) {
+        _offset = offset;
+    }
+    
+    public String getTopic() { return _topic;}
+    public URI getURI () { return _uri;}
+    public int getPartition() { return _partition;}
+    
+    public long getOffset() { return _offset;}
+
+    public boolean isValidOffset() {
+        return _offset >= 0;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (! (o instanceof KafkaETLRequest))
+            return false;
+        
+        KafkaETLRequest r = (KafkaETLRequest) o;
+        return this._topic.equals(r._topic) ||
+                    this._uri.equals(r._uri) ||
+                    this._partition == r._partition;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString(0).hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return toString(_offset);
+    }
+    
+
+    public String toString (long offset) {
+    
+        return 
+        _uri + DELIM +
+        _topic + DELIM +
+        _partition + DELIM +
+       offset;
+    }
+    
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+
+public class KafkaETLUtils {
+
+	public static PathFilter PATH_FILTER = new PathFilter() {
+		@Override
+		public boolean accept(Path path) {
+			return !path.getName().startsWith("_")
+					&& !path.getName().startsWith(".");
+		}
+	};
+
+	
+	public static Path getLastPath(Path path, FileSystem fs) throws IOException {
+
+		FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+
+		if (statuses.length == 0) {
+			return path;
+		} else {
+			Arrays.sort(statuses);
+			return statuses[statuses.length - 1].getPath();
+		}
+	}
+
+	public static String getFileName(Path path) throws IOException {
+		String fullname = path.toUri().toString();
+		String[] parts = fullname.split(Path.SEPARATOR);
+		if (parts.length < 1)
+			throw new IOException("Invalid path " + fullname);
+		return parts[parts.length - 1];
+	}
+
+	public static List<String> readText(FileSystem fs, String inputFile)
+			throws IOException, FileNotFoundException {
+		Path path = new Path(inputFile);
+		return readText(fs, path);
+	}
+
+	public static List<String> readText(FileSystem fs, Path path)
+			throws IOException, FileNotFoundException {
+		if (!fs.exists(path)) {
+			throw new FileNotFoundException("File " + path + " doesn't exist!");
+		}
+		BufferedReader in = new BufferedReader(new InputStreamReader(
+				fs.open(path)));
+		List<String> buf = new ArrayList<String>();
+		String line = null;
+
+		while ((line = in.readLine()) != null) {
+			if (line.trim().length() > 0)
+				buf.add(new String(line.trim()));
+		}
+		in.close();
+		return buf;
+	}
+
+	public static void writeText(FileSystem fs, Path outPath, String content)
+			throws IOException {
+		long timestamp = System.currentTimeMillis();
+		String localFile = "/tmp/KafkaETL_tmp_" + timestamp;
+		PrintWriter writer = new PrintWriter(new FileWriter(localFile));
+		writer.println(content);
+		writer.close();
+
+		Path src = new Path(localFile);
+		fs.moveFromLocalFile(src, outPath);
+	}
+
+	public static Props getPropsFromJob(Configuration conf) {
+		String propsString = conf.get("kafka.etl.props");
+		if (propsString == null)
+			throw new UndefinedPropertyException(
+					"The required property kafka.etl.props was not found in the Configuration.");
+		try {
+			ByteArrayInputStream input = new ByteArrayInputStream(
+					propsString.getBytes("UTF-8"));
+			Properties properties = new Properties();
+			properties.load(input);
+			return new Props(properties);
+		} catch (IOException e) {
+			throw new RuntimeException("This is not possible!", e);
+		}
+	}
+
+	 public static void setPropsInJob(Configuration conf, Props props)
+	  {
+	    ByteArrayOutputStream output = new ByteArrayOutputStream();
+	    try
+	    {
+	      props.store(output);
+	      conf.set("kafka.etl.props", new String(output.toByteArray(), "UTF-8"));
+	    }
+	    catch (IOException e)
+	    {
+	      throw new RuntimeException("This is not possible!", e);
+	    }
+	  }
+	 
+	public static Props readProps(String file) throws IOException {
+		Path path = new Path(file);
+		FileSystem fs = path.getFileSystem(new Configuration());
+		if (fs.exists(path)) {
+			InputStream input = fs.open(path);
+			try {
+				// wrap it up in another layer so that the user can override
+				// properties
+				Props p = new Props(input);
+				return new Props(p);
+			} finally {
+				input.close();
+			}
+		} else {
+			return new Props();
+		}
+	}
+
+	public static String findContainingJar(
+			@SuppressWarnings("rawtypes") Class my_class, ClassLoader loader) {
+		String class_file = my_class.getName().replaceAll("\\.", "/")
+				+ ".class";
+		return findContainingJar(class_file, loader);
+	}
+
+	public static String findContainingJar(String fileName, ClassLoader loader) {
+		try {
+			for (@SuppressWarnings("rawtypes")
+			Enumeration itr = loader.getResources(fileName); itr
+					.hasMoreElements();) {
+				URL url = (URL) itr.nextElement();
+				// logger.info("findContainingJar finds url:" + url);
+				if ("jar".equals(url.getProtocol())) {
+					String toReturn = url.getPath();
+					if (toReturn.startsWith("file:")) {
+						toReturn = toReturn.substring("file:".length());
+					}
+					toReturn = URLDecoder.decode(toReturn, "UTF-8");
+					return toReturn.replaceAll("!.*$", "");
+				}
+			}
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		return null;
+	}
+
+    public static byte[] getBytes(BytesWritable val) {
+        
+        byte[] buffer = val.getBytes();
+        
+        /* FIXME: remove the following part once the below gira is fixed
+         * https://issues.apache.org/jira/browse/HADOOP-6298
+         */
+        long len = val.getLength();
+        byte [] bytes = buffer;
+        if (len < buffer.length) {
+            bytes = new byte[(int) len];
+            System.arraycopy(buffer, 0, bytes, 0, (int)len);
+        }
+        
+        return bytes;
+    }
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.log4j.Logger;
+
+public class Props extends Properties {
+
+	private static final long serialVersionUID = 1L;
+	private static Logger logger = Logger.getLogger(Props.class);
+	
+	/**
+	 * default constructor
+	 */
+	public Props() {
+		super();
+	}
+
+	/**
+	 * copy constructor 
+	 * @param props
+	 */
+	public Props(Props props) {
+		if (props != null) {
+			this.put(props);
+		}
+	}
+	
+	/**
+	 * construct props from a list of files
+	 * @param files		paths of files
+	 * @throws FileNotFoundException
+	 * @throws IOException
+	 */
+	public Props(String... files) throws FileNotFoundException, IOException {
+		this(Arrays.asList(files));
+	}
+
+	/**
+	 * construct props from a list of files
+	 * @param files		paths of files
+	 * @throws FileNotFoundException
+	 * @throws IOException
+	 */
+	public Props(List<String> files) throws FileNotFoundException, IOException {
+
+		for (int i = 0; i < files.size(); i++) {
+			InputStream input = new BufferedInputStream(new FileInputStream(
+					new File(files.get(i)).getAbsolutePath()));
+			super.load(input);
+			input.close();
+		}
+	}
+
+	/**
+	 * construct props from a list of input streams
+	 * @param inputStreams
+	 * @throws IOException
+	 */
+	public Props(InputStream... inputStreams) throws IOException {
+		for (InputStream stream : inputStreams)
+			super.load(stream);
+	}
+
+	/**
+	 * construct props from a list of maps
+	 * @param props
+	 */
+	public Props(Map<String, String>... props) {
+		for (int i = props.length - 1; i >= 0; i--)
+			super.putAll(props[i]);
+	}
+
+	/**
+	 * construct props from a list of Properties
+	 * @param properties
+	 */
+	public Props(Properties... properties) {
+		for (int i = properties.length - 1; i >= 0; i--){
+			this.put(properties[i]);
+		}
+	}
+
+	/**
+	 * build props from a list of strings and interprate them as
+	 * key, value, key, value,....
+	 * 
+	 * @param args
+	 * @return
+	 */
+	@SuppressWarnings("unchecked")
+	public static Props of(String... args) {
+		if (args.length % 2 != 0)
+			throw new IllegalArgumentException(
+					"Must have an equal number of keys and values.");
+		Map<String, String> vals = new HashMap<String, String>(args.length / 2);
+		for (int i = 0; i < args.length; i += 2)
+			vals.put(args[i], args[i + 1]);
+		return new Props(vals);
+	}
+
+	/**
+	 * Put the given Properties into the Props. 
+	 * 
+	 * @param properties
+	 *            The properties to put
+	 * 
+	 */
+	public void put(Properties properties) {
+		for (String propName : properties.stringPropertyNames()) {
+			super.put(propName, properties.getProperty(propName));
+		}
+	}
+
+	/**
+	 * get property of "key" and split the value by " ," 
+	 * @param key		
+	 * @return
+	 */
+	public List<String> getStringList(String key) {
+		return getStringList(key, "\\s*,\\s*");
+	}
+
+	/**
+	 * get property of "key" and split the value by "sep"
+	 * @param key
+	 * @param sep
+	 * @return
+	 */
+	public List<String> getStringList(String key, String sep) {
+		String val =  super.getProperty(key);
+		if (val == null || val.trim().length() == 0)
+			return Collections.emptyList();
+
+		if (containsKey(key))
+			return Arrays.asList(val.split(sep));
+		else
+			throw new UndefinedPropertyException("Missing required property '"
+					+ key + "'");
+	}
+
+	/**
+	 * get string list with default value. default delimiter is ","
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 */
+	public List<String> getStringList(String key, List<String> defaultValue) {
+		if (containsKey(key))
+			return getStringList(key);
+		else
+			return defaultValue;
+	}
+
+	/**
+	 * get string list with default value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 */
+	public List<String> getStringList(String key, List<String> defaultValue,
+			String sep) {
+		if (containsKey(key))
+			return getStringList(key, sep);
+		else
+			return defaultValue;
+	}
+
+	@SuppressWarnings("unchecked")
+	protected <T> T getValue(String key, T defaultValue) 
+	throws Exception {
+		
+		if (containsKey(key)) {
+			Object value = super.get(key);
+			if (value.getClass().isInstance(defaultValue)) {
+				return (T)value;
+			} else if (value instanceof String) {
+				// call constructor(String) to initialize it
+				@SuppressWarnings("rawtypes")
+				Constructor ct = defaultValue.getClass().getConstructor(String.class);
+				String v = ((String)value).trim();
+				Object ret = ct.newInstance(v);
+				return (T) ret;
+			}
+			else throw new UndefinedPropertyException ("Property " + key + 
+					": cannot convert value of " + value.getClass().getName() + 
+					" to " + defaultValue.getClass().getName());
+		}
+		else {
+			return defaultValue;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	protected <T> T getValue(String key, Class<T> mclass) 
+	throws Exception {
+		
+		if (containsKey(key)) {
+			Object value = super.get(key);
+			if (value.getClass().equals(mclass)) {
+				return (T)value;
+			} else if (value instanceof String) {
+				// call constructor(String) to initialize it
+				@SuppressWarnings("rawtypes")
+				Constructor ct = mclass.getConstructor(String.class);
+				String v = ((String)value).trim();
+				Object ret = ct.newInstance(v);
+				return (T) ret;
+			}
+			else throw new UndefinedPropertyException ("Property " + key + 
+					": cannot convert value of " + value.getClass().getName() + 
+					" to " + mclass.getClass().getName());
+		}
+		else {
+			throw new UndefinedPropertyException ("Missing required property '"
+					+ key + "'");
+		}
+	}
+
+	/**
+	 * get boolean value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type boolean or string
+	 */
+	public Boolean getBoolean(String key, Boolean defaultValue) 
+	throws Exception {
+		return getValue (key, defaultValue);
+	}
+
+	/**
+	 * get boolean value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type boolean or string or 
+	 * 										if value doesn't exist
+	 */
+	public Boolean getBoolean(String key) throws Exception {
+		return getValue (key, Boolean.class);
+	}
+
+	/**
+	 * get long value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type long or string
+	 */
+	public Long getLong(String name, Long defaultValue) 
+	throws Exception {
+		return getValue(name, defaultValue);
+	}
+
+	/**
+	 * get long value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type long or string or 
+	 * 										if value doesn't exist
+	 */
+	public Long getLong(String name) throws Exception  {
+		return getValue (name, Long.class);
+	}
+
+	/**
+	 * get integer value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type integer or string
+	 */
+	public Integer getInt(String name, Integer defaultValue) 
+	throws Exception  {
+		return getValue(name, defaultValue);
+	}
+
+	/**
+	 * get integer value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type integer or string or 
+	 * 										if value doesn't exist
+	 */
+	public Integer getInt(String name) throws Exception {
+		return getValue (name, Integer.class);
+	}
+
+	/**
+	 * get double value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type double or string
+	 */
+	public Double getDouble(String name, double defaultValue) 
+	throws Exception {
+		return getValue(name, defaultValue);
+	}
+
+	/**
+	 * get double value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type double or string or 
+	 * 										if value doesn't exist
+	 */
+	public double getDouble(String name) throws Exception {
+		return getValue(name, Double.class);
+	}
+
+	/**
+	 * get URI value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type URI or string 
+	 */
+	public URI getUri(String name, URI defaultValue) throws Exception {
+		return getValue(name, defaultValue);
+	}
+
+	/**
+	 * get URI value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type URI or string 
+	 */
+	public URI getUri(String name, String defaultValue) 
+	throws Exception {
+		URI defaultV = new URI(defaultValue);
+		return getValue(name, defaultV);
+	}
+
+	/**
+	 * get URI value
+	 * @param key
+	 * @param defaultValue
+	 * @return
+	 * @throws Exception 	if value is not of type URI or string or 
+	 * 										if value doesn't exist
+	 */
+	public URI getUri(String name) throws Exception {
+		return getValue(name, URI.class);
+	}
+
+	/**
+	 * compare two props 
+	 * @param p
+	 * @return
+	 */
+	public boolean equalsProps(Props p) {
+		if (p == null) {
+			return false;
+		}
+
+		final Set<String> myKeySet = getKeySet();
+		for (String s : myKeySet) {
+			if (!get(s).equals(p.get(s))) {
+				return false;
+			}
+		}
+
+		return myKeySet.size() == p.getKeySet().size();
+	}
+
+
+	/**
+	 * Get a map of all properties by string prefix
+	 * 
+	 * @param prefix
+	 *            The string prefix
+	 */
+	public Map<String, String> getMapByPrefix(String prefix) {
+		Map<String, String> values = new HashMap<String, String>();
+
+		for (String key : super.stringPropertyNames()) {
+			if (key.startsWith(prefix)) {
+				values.put(key.substring(prefix.length()), super.getProperty(key));
+			}
+		}
+		return values;
+	}
+
+    /**
+     * Store all properties
+     * 
+     * @param out The stream to write to
+     * @throws IOException If there is an error writing
+     */
+    public void store(OutputStream out) throws IOException {
+           super.store(out, null);
+    }
+    
+    /**
+     * get all property names
+     * @return
+     */
+	public Set<String> getKeySet() {
+		return super.stringPropertyNames();
+	}
+
+	/**
+	 * log properties
+	 * @param comment
+	 */
+	public void logProperties(String comment) {
+		logger.info(comment);
+
+		for (String key : getKeySet()) {
+			logger.info("  key=" + key + " value=" + get(key));
+		}
+	}
+
+	/**
+	 * clone a Props
+	 * @param p
+	 * @return
+	 */
+	public static Props clone(Props p) {
+		return new Props(p);
+	}
+
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl;
+
+public class UndefinedPropertyException extends RuntimeException {
+
+	private static final long serialVersionUID = 1;
+
+	public UndefinedPropertyException(String message) {
+		super(message);
+	}
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import kafka.message.NoCompressionCodec;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+
+import kafka.etl.KafkaETLKey;
+import kafka.etl.KafkaETLRequest;
+import kafka.etl.KafkaETLUtils;
+import kafka.etl.Props;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+/**
+ * Use this class to produce test events to Kafka server. Each event contains a
+ * random timestamp in text format.
+ */
+@SuppressWarnings("deprecation")
+public class DataGenerator {
+
+	protected final static Random RANDOM = new Random(
+			System.currentTimeMillis());
+
+	protected Props _props;
+	protected SyncProducer _producer = null;
+	protected URI _uri = null;
+	protected String _topic;
+	protected int _count;
+	protected String _offsetsDir;
+	protected final int TCP_BUFFER_SIZE = 300 * 1000;
+	protected final int CONNECT_TIMEOUT = 20000; // ms
+	protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms
+
+	public DataGenerator(String id, Props props) throws Exception {
+		_props = props;
+		_topic = props.getProperty("kafka.etl.topic");
+		System.out.println("topics=" + _topic);
+		_count = props.getInt("event.count");
+
+		_offsetsDir = _props.getProperty("input");
+		
+		// initialize kafka producer to generate count events
+		String serverUri = _props.getProperty("kafka.server.uri");
+		_uri = new URI (serverUri);
+		
+		System.out.println("server uri:" + _uri.toString());
+        Properties producerProps = new Properties();
+        producerProps.put("host", _uri.getHost());
+        producerProps.put("port", String.valueOf(_uri.getPort()));
+        producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
+        producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
+        producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
+		_producer = new SyncProducer(new SyncProducerConfig(producerProps));
+			
+	}
+
+	public void run() throws Exception {
+
+		List<Message> list = new ArrayList<Message>();
+		for (int i = 0; i < _count; i++) {
+			Long timestamp = RANDOM.nextLong();
+			if (timestamp < 0) timestamp = -timestamp;
+			byte[] bytes = timestamp.toString().getBytes("UTF8");
+			Message message = new Message(bytes);
+			list.add(message);
+		}
+		// send events
+		System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
+		_producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
+
+		// close the producer
+		_producer.close();
+		
+		// generate offset files
+		generateOffsets();
+	}
+
+    protected void generateOffsets() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi"));
+        conf.setCompressMapOutput(false);
+        Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat");
+        FileSystem fs = outPath.getFileSystem(conf);
+        if (fs.exists(outPath)) fs.delete(outPath);
+        
+        KafkaETLRequest request =
+            new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
+
+        System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
+        byte[] bytes = request.toString().getBytes("UTF-8");
+        KafkaETLKey dummyKey = new KafkaETLKey();
+        SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath, 
+                                        KafkaETLKey.class, BytesWritable.class);
+        writer.append(dummyKey, new BytesWritable(bytes));
+        writer.close();
+    }
+    
+	public static void main(String[] args) throws Exception {
+
+		if (args.length < 1)
+			throw new Exception("Usage: - config_file");
+
+		Props props = new Props(args[0]);
+		DataGenerator job = new DataGenerator("DataGenerator", props);
+		job.run();
+	}
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.etl.impl;
+
+import kafka.etl.KafkaETLInputFormat;
+import kafka.etl.KafkaETLJob;
+import kafka.etl.Props;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * This is a simple Kafka ETL job which pull text events generated by
+ * DataGenerator and store them in hdfs
+ */
+@SuppressWarnings("deprecation")
+public class SimpleKafkaETLJob {
+
+    protected String _name;
+    protected Props _props;
+    protected String _input;
+    protected String _output;
+    protected String _topic;
+    
+	public SimpleKafkaETLJob(String name, Props props) throws Exception {
+		_name = name;
+		_props = props;
+		
+		_input = _props.getProperty("input");
+		_output = _props.getProperty("output");
+		
+		_topic = props.getProperty("kafka.etl.topic");
+	}
+
+
+	protected JobConf createJobConf() throws Exception {
+		JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", _topic, _props, getClass());
+		
+		jobConf.setMapperClass(SimpleKafkaETLMapper.class);
+		KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input));
+		
+		jobConf.setOutputKeyClass(LongWritable.class);
+		jobConf.setOutputValueClass(Text.class);
+		jobConf.setOutputFormat(TextOutputFormat.class);
+		TextOutputFormat.setCompressOutput(jobConf, false);
+		Path output = new Path(_output);
+		FileSystem fs = output.getFileSystem(jobConf);
+		if (fs.exists(output)) fs.delete(output);
+		TextOutputFormat.setOutputPath(jobConf, output);
+		
+		jobConf.setNumReduceTasks(0);
+		return jobConf;
+	}
+	
+    public void execute () throws Exception {
+        JobConf conf = createJobConf();
+        RunningJob runningJob = new JobClient(conf).submitJob(conf);
+        String id = runningJob.getJobID();
+        System.out.println("Hadoop job id=" + id);
+        runningJob.waitForCompletion();
+        
+        if (!runningJob.isSuccessful()) 
+            throw new Exception("Hadoop ETL job failed! Please check status on http://"
+                                         + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id);
+    }
+
+	/**
+	 * for testing only
+	 * 
+	 * @param args
+	 * @throws Exception
+	 */
+	public static void main(String[] args) throws Exception {
+
+		if (args.length < 1)
+			throw new Exception("Usage: - config_file");
+
+		Props props = new Props(args[0]);
+		SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob",
+				props);
+		job.execute();
+	}
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.etl.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import kafka.etl.KafkaETLKey;
+import kafka.etl.KafkaETLUtils;
+import kafka.message.Message;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Simple implementation of KafkaETLMapper. It assumes that 
+ * input data are text timestamp (long).
+ */
+@SuppressWarnings("deprecation")
+public class SimpleKafkaETLMapper implements
+Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
+
+    protected long _count = 0;
+    
+	protected Text getData(Message message) throws IOException {
+		ByteBuffer buf = message.payload();
+		
+		byte[] array = new byte[buf.limit()];
+		buf.get(array);
+		
+		Text text = new Text( new String(array, "UTF8"));
+		return text;
+	}
+
+
+    @Override
+    public void map(KafkaETLKey key, BytesWritable val,
+            OutputCollector<LongWritable, Text> collector,
+            Reporter reporter) throws IOException {
+        
+         
+        byte[] bytes = KafkaETLUtils.getBytes(val);
+        
+        //check the checksum of message
+        Message message = new Message(bytes);
+        long checksum = key.getChecksum();
+        if (checksum != message.checksum()) 
+            throw new IOException ("Invalid message checksum " 
+                                            + message.checksum() + ". Expected " + key + ".");
+        Text data = getData (message);
+        _count ++;
+           
+        collector.collect(new LongWritable (_count), data);
+
+    }
+
+
+    @Override
+    public void configure(JobConf arg0) {
+        // TODO Auto-generated method stub
+        
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+        
+    }
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/test/test.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,28 @@
+# name of test topic
+kafka.etl.topic=SimpleTestEvent
+
+# hdfs location of jars
+hdfs.default.classpath.dir=/tmp/kafka/lib
+
+# number of test events to be generated
+event.count=1000
+
+# hadoop id and group
+hadoop.job.ugi=kafka,hadoop
+
+# kafka server uri
+kafka.server.uri=tcp://localhost:9092
+
+# hdfs location of input directory 
+input=/tmp/kafka/data
+
+# hdfs location of output directory
+output=/tmp/kafka/output
+
+# limit the number of events to be fetched;
+# value -1 means no limitation
+kafka.request.limit=-1
+
+# kafka parameters
+client.buffer.size=1048576
+client.so.timeout=60000

Added: incubator/kafka/trunk/contrib/hadoop-producer/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/README.md (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/README.md Mon Aug  1 23:41:24 2011
@@ -0,0 +1,134 @@
+Hadoop to Kafka Bridge
+======================
+
+What is it?
+-----------
+
+The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
+are two possible mechanisms, varying from easy to difficult:  writing a Pig
+script and writing messages in Avro format, or rolling your own job using the
+Kafka `OutputFormat`. 
+
+Note that there are no write-once semantics: any client of the data must handle
+messages in an idempotent manner. That is, because of node failures and
+Hadoop's failure recovery, it's possible that the same message is published
+multiple times in the same push.
+
+How do I use it?
+----------------
+
+With this bridge, Kafka topics are URIs and are specified as
+`kafka://<kafka-server>/<kafka-topic>`.
+
+### Pig ###
+
+Pig bridge writes data in binary Avro format with one message created per input
+row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
+with the Avro schema as its first argument. You'll need to register the
+appropriate Kafka JARs. Here is what an example Pig script looks like:
+
+    REGISTER hadoop-kafka-bridge-0.5.2.jar;
+    REGISTER avro-1.4.0.jar;
+    REGISTER piggybank.jar;
+    REGISTER kafka-0.5.2.jar;
+    REGISTER jackson-core-asl-1.5.5.jar;
+    REGISTER jackson-mapper-asl-1.5.5.jar;
+    REGISTER scala-library.jar;
+
+    member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
+    names = FOREACH member_info GENERATE name;
+    STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+
+That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
+from Pig's data model to the specified Avro schema.
+
+Further, multi-store is possible with KafkaStorage, so you can easily write to
+multiple topics and brokers in the same job:
+
+    SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
+    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
+    STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema');
+
+### KafkaOutputFormat ###
+
+KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It
+uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
+BytesWritable). This is a lower-level method of publishing data, as it allows
+you to precisely control output.
+
+Here is an example that publishes some input text. With KafkaOutputFormat, the
+key is a NullWritable and is ignored; only values are published. Speculative
+execution is turned off by the OutputFormat.
+
+    import kafka.bridge.hadoop.KafkaOutputFormat;
+    
+    import org.apache.hadoop.fs.Path;
+    import org.apache.hadoop.io.BytesWritable;
+    import org.apache.hadoop.io.NullWritable;
+    import org.apache.hadoop.io.Text;
+    import org.apache.hadoop.mapreduce.Job;
+    import org.apache.hadoop.mapreduce.Mapper;
+    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+    
+    import java.io.IOException;
+    
+    public class TextPublisher
+    {
+      public static void main(String[] args) throws Exception
+      {
+        if (args.length != 2) {
+          System.err.println("usage: <input path> <kafka output url>");
+          return;
+        }
+    
+        Job job = new Job();
+    
+        job.setJarByClass(TextPublisher.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(KafkaOutputFormat.class);
+    
+        job.setMapperClass(TheMapper.class);
+        job.setNumReduceTasks(0);
+    
+        FileInputFormat.addInputPath(job, new Path(args[0]));
+        KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
+    
+        if (!job.waitForCompletion(true)) {
+          throw new RuntimeException("Job failed!");
+        }
+      }
+    
+      public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+      {
+        @Override
+        protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+        {
+          context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+        }
+      }
+    }
+
+What can I tune?
+----------------
+
+Normally, you needn't change any of these parameters:
+
+* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka
+  producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
+* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka
+  producer docs). Default is 30*1000 (30s).
+* kafka.output.reconnect_timeout: Milliseconds to wait until attempting
+  reconnection (see Kafka producer docs). Default is 1000 (1s).
+* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer
+  docs). Default is 64*1024 (64KB). 
+* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
+  docs). Default is 1024*1024 (1MB).
+
+For easier debugging, the above values as well as the Kafka URI
+(kafka.output.url), the output server (kafka.output.server), the topic
+(kafka.output.topic), and the schema (kafka.output.schema) are injected into
+the job's configuration.
+

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/avro-1.4.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/commons-logging-1.0.4.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/hadoop-0.20.2-core.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-core-asl-1.5.5.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/jackson-mapper-asl-1.5.5.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/pig-0.8.0-core.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-producer/lib/piggybank.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.examples;
+
+import kafka.bridge.hadoop.KafkaOutputFormat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+
+public class TextPublisher
+{
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 2) {
+      System.err.println("usage: <input path> <kafka output url>");
+      return;
+    }
+
+    Job job = new Job();
+
+    job.setJarByClass(TextPublisher.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(KafkaOutputFormat.class);
+
+    job.setMapperClass(TheMapper.class);
+    job.setNumReduceTasks(0);
+
+    FileInputFormat.addInputPath(job, new Path(args[0]));
+    KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
+
+    if (!job.waitForCompletion(true)) {
+      throw new RuntimeException("Job failed!");
+    }
+  }
+
+  public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+  {
+    @Override
+    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+    {
+      context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+    }
+  }
+}
+

Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.hadoop;
+
+import java.util.Properties;
+
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
+{
+  public static final String KAFKA_URL = "kafka.output.url";
+  public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
+  public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
+  public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
+  public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
+  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+
+  public KafkaOutputFormat()
+  {
+    super();
+  }
+
+  public static void setOutputPath(Job job, Path outputUrl)
+  {
+    job.getConfiguration().set(KafkaOutputFormat.KAFKA_URL, outputUrl.toString());
+
+    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
+    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
+  }
+
+  public static Path getOutputPath(JobContext job)
+  {
+    String name = job.getConfiguration().get(KafkaOutputFormat.KAFKA_URL);
+    return name == null ? null : new Path(name);
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException
+  {
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+  {
+    // Is there a programmatic way to get the temp dir? I see it hardcoded everywhere in Hadoop, Hive, and Pig.
+    return new FileOutputCommitter(new Path("/tmp/" + taskAttemptContext.getTaskAttemptID().getJobID().toString()), taskAttemptContext);
+  }
+
+  @Override
+  public RecordWriter<NullWritable, W> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
+  {
+    Path outputPath = getOutputPath(context);
+    if (outputPath == null)
+      throw new IllegalArgumentException("no kafka output url specified");
+    URI uri = outputPath.toUri();
+    Configuration job = context.getConfiguration();
+
+    final String topic = uri.getPath().substring(1);        // ignore the initial '/' in the path
+
+    final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
+    final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
+    final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
+    final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
+    final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
+
+    job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort()));
+    job.set("kafka.output.topic", topic);
+    job.setInt("kafka.output.queue_size", queueSize);
+    job.setInt("kafka.output.connect_timeout", timeout);
+    job.setInt("kafka.output.reconnect_interval", interval);
+    job.setInt("kafka.output.bufsize", bufSize);
+    job.setInt("kafka.output.max_msgsize", maxSize);
+
+    if (uri.getHost().isEmpty())
+      throw new IllegalArgumentException("missing kafka server");
+    if (uri.getPath().isEmpty())
+      throw new IllegalArgumentException("missing kafka topic");
+
+    Properties props = new Properties();
+    props.setProperty("host", uri.getHost());
+    props.setProperty("port", Integer.toString(uri.getPort()));
+    props.setProperty("buffer.size", Integer.toString(bufSize));
+    props.setProperty("connect.timeout.ms", Integer.toString(timeout));
+    props.setProperty("reconnect.interval", Integer.toString(interval));
+    props.setProperty("max.message.size", Integer.toString(maxSize));
+
+    SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
+    return new KafkaRecordWriter<W>(producer, topic, queueSize);
+  }
+}
+

Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.hadoop;
+
+import kafka.message.Message;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.producer.SyncProducer;
+
+import kafka.message.NoCompressionCodec;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
+{
+  protected SyncProducer producer;
+  protected String topic;
+
+  protected List<Message> msgList = new ArrayList<Message>();
+  protected int totalSize = 0;
+  protected int queueSize;
+
+  public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize)
+  {
+    this.producer = producer;
+    this.topic = topic;
+    this.queueSize = queueSize;
+  }
+
+  protected void sendMsgList()
+  {
+    if (msgList.size() > 0) {
+      ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList);
+      producer.send(topic, msgSet);
+      msgList.clear();
+      totalSize = 0;
+    }
+  }
+
+  @Override
+  public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
+  {
+    Message msg = new Message(value.getBytes());
+    msgList.add(msg);
+    totalSize += msg.size();
+
+    if (totalSize > queueSize)
+      sendMsgList();
+  }
+
+  @Override
+  public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+  {
+    sendMsgList();
+    producer.close();
+  }
+}

Added: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.bridge.pig;
+
+import kafka.bridge.hadoop.KafkaOutputFormat;
+import kafka.bridge.hadoop.KafkaRecordWriter;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Encoder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
+import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class AvroKafkaStorage extends StoreFunc
+{
+  protected KafkaRecordWriter writer;
+  protected org.apache.avro.Schema avroSchema;
+  protected PigAvroDatumWriter datumWriter;
+  protected Encoder encoder;
+  protected ByteArrayOutputStream os;
+
+  public AvroKafkaStorage(String schema)
+  {
+    this.avroSchema = org.apache.avro.Schema.parse(schema);
+  }
+
+  @Override
+  public OutputFormat getOutputFormat() throws IOException
+  {
+    return new KafkaOutputFormat();
+  }
+
+  @Override
+  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+  {
+    return location;
+  }
+
+  @Override
+  public void setStoreLocation(String uri, Job job) throws IOException
+  {
+    KafkaOutputFormat.setOutputPath(job, new Path(uri));
+  }
+
+  @Override
+  public void prepareToWrite(RecordWriter writer) throws IOException
+  {
+    if (this.avroSchema == null)
+      throw new IllegalStateException("avroSchema shouldn't be null");
+
+    this.writer = (KafkaRecordWriter) writer;
+    this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
+    this.os = new ByteArrayOutputStream();
+    this.encoder = new BinaryEncoder(this.os);
+  }
+
+  @Override
+  public void cleanupOnFailure(String location, Job job) throws IOException
+  {
+  }
+
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature)
+  {
+  }
+
+  @Override
+  public void checkSchema(ResourceSchema schema) throws IOException
+  {
+    this.avroSchema = PigSchema2Avro.validateAndConvert(avroSchema, schema);
+  }
+
+  protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException
+  {
+  }
+
+  @Override
+  public void putNext(Tuple tuple) throws IOException
+  {
+    os.reset();
+    writeEnvelope(os, this.encoder);
+    datumWriter.write(tuple, this.encoder);
+    this.encoder.flush();
+
+    try {
+      this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray()));
+    }
+    catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}

Added: incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/core/lib/zkclient-0.1.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/core/lib/zookeeper-3.3.3.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import consumer.ConsumerConfig
+import org.apache.log4j.Logger
+import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
+import utils.Utils
+import org.apache.log4j.jmx.LoggerDynamicMBean
+
+object Kafka {
+  private val logger = Logger.getLogger(Kafka.getClass)
+
+  def main(args: Array[String]): Unit = {
+    val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
+    Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
+
+    if(args.length != 1 && args.length != 2) {
+      println("USAGE: java [options] " + classOf[KafkaServer].getSimpleName() + " server.properties [consumer.properties")
+      System.exit(1)
+    }
+  
+    try {
+      var kafkaServerStartble: KafkaServerStartable = null
+      val props = Utils.loadProps(args(0))
+      val serverConfig = new KafkaConfig(props)
+      if (args.length == 2) {
+        val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
+        kafkaServerStartble = new KafkaServerStartable(serverConfig, consumerConfig)
+      }
+      else
+        kafkaServerStartble = new KafkaServerStartable(serverConfig)
+
+      // attach shutdown handler to catch control-c
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        override def run() = {
+          kafkaServerStartble.shutdown
+          kafkaServerStartble.awaitShutdown
+        }
+      });
+
+      kafkaServerStartble.startup
+      kafkaServerStartble.awaitShutdown
+    }
+    catch {
+      case e => logger.fatal(e)
+    }
+    System.exit(0)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/FetchRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.network._
+import kafka.utils._
+
+object FetchRequest {
+    
+  def readFrom(buffer: ByteBuffer): FetchRequest = {
+    val topic = Utils.readShortString(buffer, "UTF-8")
+    val partition = buffer.getInt()
+    val offset = buffer.getLong()
+    val size = buffer.getInt()
+    new FetchRequest(topic, partition, offset, size)
+  }
+}
+
+class FetchRequest(val topic: String,
+                   val partition: Int,
+                   val offset: Long, 
+                   val maxSize: Int) extends Request(RequestKeys.Fetch) {
+  
+  def writeTo(buffer: ByteBuffer) {
+    Utils.writeShortString(buffer, topic, "UTF-8")
+    buffer.putInt(partition)
+    buffer.putLong(offset)
+    buffer.putInt(maxSize)
+  }
+  
+  def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
+
+  override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
+    " maxSize:" + maxSize + ")"
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.network._
+import kafka.utils._
+import kafka.api._
+
+object MultiFetchRequest {
+  def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
+    val count = buffer.getShort
+    val fetches = new Array[FetchRequest](count)
+    for(i <- 0 until fetches.length)
+      fetches(i) = FetchRequest.readFrom(buffer)
+    new MultiFetchRequest(fetches)
+  }
+}
+
+class MultiFetchRequest(val fetches: Array[FetchRequest]) extends Request(RequestKeys.MultiFetch) {
+  def writeTo(buffer: ByteBuffer) {
+    if(fetches.length > Short.MaxValue)
+      throw new IllegalArgumentException("Number of requests in MultiFetchRequest exceeds " + Short.MaxValue + ".")
+    buffer.putShort(fetches.length.toShort)
+    for(fetch <- fetches)
+      fetch.writeTo(buffer)
+  }
+  
+  def sizeInBytes: Int = {
+    var size = 2
+    for(fetch <- fetches)
+      size += fetch.sizeInBytes
+    size
+  }
+
+
+  override def toString(): String = {
+    val buffer = new StringBuffer
+    for(fetch <- fetches) {
+      buffer.append(fetch.toString)
+      buffer.append(",")
+    }
+    buffer.toString
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchResponse.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import collection.mutable
+import kafka.utils.IteratorTemplate
+import kafka.message._
+
+class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] {
+  private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet]
+  
+  for(i <- 0 until numSets) {
+    val size = buffer.getInt()
+    val errorCode: Int = buffer.getShort()
+    val copy = buffer.slice()
+    val payloadSize = size - 2
+    copy.limit(payloadSize)
+    buffer.position(buffer.position + payloadSize)
+    messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode)
+  }
+ 
+  def iterator : Iterator[ByteBufferMessageSet] = {
+    new IteratorTemplate[ByteBufferMessageSet] {
+      val iter = messageSets.iterator
+
+      override def makeNext(): ByteBufferMessageSet = {
+        if(iter.hasNext)
+          iter.next
+        else
+          return allDone
+      }
+    }
+  }
+
+  override def toString() = this.messageSets.toString
+}