You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 20:03:07 UTC

[1/2] kafka git commit: KAFKA-2783; Drop outdated hadoop contrib modules

Repository: kafka
Updated Branches:
  refs/heads/trunk 7073fa7ef -> 69af573b3


http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md
deleted file mode 100644
index a5bef73..0000000
--- a/contrib/hadoop-producer/README.md
+++ /dev/null
@@ -1,94 +0,0 @@
-Hadoop to Kafka Bridge
-======================
-
-What's new?
------------
-
-* Kafka 0.8 support
-  * No more ZK-based load balancing (backwards incompatible change)
-* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a
-  key in the output committer of your job. The Pig StoreFunc doesn't support
-  semantic partitioning.
-* Config parameters are now the same as the Kafka producer, just prepended with
-  kafka.output (e.g., kafka.output.max.message.size). This is a backwards
-  incompatible change.
-
-What is it?
------------
-
-The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
-are two possible mechanisms, varying from easy to difficult: writing a Pig
-script and writing messages in Avro format, or rolling your own job using the
-Kafka `OutputFormat`. 
-
-Note that there are no write-once semantics: any client of the data must handle
-messages in an idempotent manner. That is, because of node failures and
-Hadoop's failure recovery, it's possible that the same message is published
-multiple times in the same push.
-
-How do I use it?
-----------------
-
-With this bridge, Kafka topics are URIs and are specified as URIs of the form
-`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker.
-
-### Pig ###
-
-Pig bridge writes data in binary Avro format with one message created per input
-row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
-with the Avro schema as its first argument. You'll need to register the
-appropriate Kafka JARs. Here is what an example Pig script looks like:
-
-    REGISTER hadoop-producer_2.8.0-0.8.0.jar;
-    REGISTER avro-1.4.0.jar;
-    REGISTER piggybank.jar;
-    REGISTER kafka-0.8.0.jar;
-    REGISTER jackson-core-asl-1.5.5.jar;
-    REGISTER jackson-mapper-asl-1.5.5.jar;
-    REGISTER scala-library.jar;
-
-    member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray);
-    names = FOREACH member_info GENERATE name;
-    STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
-
-That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
-from Pig's data model to the specified Avro schema.
-
-Further, multi-store is possible with KafkaStorage, so you can easily write to
-multiple topics and brokers in the same job:
-
-    SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
-    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
-    STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema');
-
-### KafkaOutputFormat ###
-
-KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It
-uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
-BytesWritable). This is a lower-level method of publishing data, as it allows
-you to precisely control output.
-
-Included is an example that publishes some input text line-by-line to a topic.
-With KafkaOutputFormat, the key can be a null, where it is ignored by the
-producer (random partitioning), or any object for semantic partitioning of the
-stream (with an appropriate Kafka partitioner set). Speculative execution is
-turned off by the OutputFormat.
-
-What can I tune?
-----------------
-
-* kafka.output.queue.bytes: Bytes to queue in memory before pushing to the Kafka
-  producer (i.e., the batch size). Default is 1,000,000 (1 million) bytes.
-
-Any of Kafka's producer parameters can be changed by prefixing them with
-"kafka.output" in one's job configuration. For example, to change the
-compression codec, one would add the "kafka.output.compression.codec" parameter
-(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no
-compression). 
-
-For easier debugging, the above values as well as the Kafka broker information
-(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema
-(kafka.output.schema) are injected into the job's configuration. By default,
-the Hadoop producer uses Kafka's sync producer as asynchronous operation
-doesn't make sense in the batch Hadoop case.
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
deleted file mode 100644
index d447b1d..0000000
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.examples;
-
-import java.io.IOException;
-import kafka.bridge.hadoop.KafkaOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-/**
- * Publish a text file line by line to a Kafka topic
- */
-public class TextPublisher
-{
-  public static void main(String[] args) throws Exception
-  {
-    if (args.length != 2) {
-      System.err.println("usage: <input path> <kafka output url>");
-      return;
-    }
-
-    Job job = new Job();
-
-    job.setJarByClass(TextPublisher.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(KafkaOutputFormat.class);
-
-    job.setMapperClass(TheMapper.class);
-    job.setNumReduceTasks(0);
-
-    FileInputFormat.addInputPath(job, new Path(args[0]));
-    KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
-
-    if (!job.waitForCompletion(true)) {
-      throw new RuntimeException("Job failed!");
-    }
-  }
-
-  public static class TheMapper extends Mapper<Object, Text, Object, Object>
-  {
-    @Override
-    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
-    {
-      context.write(null, value.getBytes());
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
deleted file mode 100644
index 417b4b3..0000000
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.*;
-
-import kafka.common.KafkaException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.ProducerConfig;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.log4j.Logger;
-
-public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
-{
-  private Logger log = Logger.getLogger(KafkaOutputFormat.class);
-
-  public static final String KAFKA_URL = "kafka.output.url";
-  /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window):
-   *  We set the default to a million bytes so that the server will not reject the batch of messages
-   *  with a MessageSizeTooLargeException. The actual size will be smaller after compression.
-   */
-  public static final int KAFKA_QUEUE_BYTES = 1000000;
-
-  public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
-  private static final Map<String, String> kafkaConfigMap;
-  static {
-    Map<String, String> cMap = new HashMap<String, String>();
-
-    // default Hadoop producer configs
-    cMap.put("producer.type", "sync");
-    cMap.put("compression.codec", Integer.toString(1));
-    cMap.put("request.required.acks", Integer.toString(1));
-
-    kafkaConfigMap = Collections.unmodifiableMap(cMap);
-  }
-
-  public KafkaOutputFormat()
-  {
-    super();
-  }
-
-  public static void setOutputPath(Job job, Path outputUrl)
-  {
-    job.getConfiguration().set(KafkaOutputFormat.KAFKA_URL, outputUrl.toString());
-
-    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
-    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
-  }
-
-  public static Path getOutputPath(JobContext job)
-  {
-    String name = job.getConfiguration().get(KafkaOutputFormat.KAFKA_URL);
-    return name == null ? null : new Path(name);
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException
-  {
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
-  {
-    // Is there a programmatic way to get the temp dir? I see it hardcoded everywhere in Hadoop, Hive, and Pig.
-    return new FileOutputCommitter(new Path("/tmp/" + taskAttemptContext.getTaskAttemptID().getJobID().toString()), taskAttemptContext);
-  }
-
-  @Override
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
-  {
-    Path outputPath = getOutputPath(context);
-    if (outputPath == null)
-      throw new KafkaException("no kafka output url specified");
-    URI uri = URI.create(outputPath.toString());
-    Configuration job = context.getConfiguration();
-
-    Properties props = new Properties();
-    String topic;
-
-    props.putAll(kafkaConfigMap);                       // inject default configuration
-    for (Map.Entry<String, String> m : job) {           // handle any overrides
-      if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX))
-        continue;
-      if (m.getKey().equals(KAFKA_URL))
-        continue;
-
-      String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1);
-      props.setProperty(kafkaKeyName, m.getValue());    // set Kafka producer property
-    }
-
-    // inject Kafka producer props back into jobconf for easier debugging
-    for (Map.Entry<Object, Object> m : props.entrySet()) {
-      job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString());
-    }
-
-    // KafkaOutputFormat specific parameters
-    final int queueBytes = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.bytes", KAFKA_QUEUE_BYTES);
-
-    if (uri.getScheme().equals("kafka")) {
-      // using the direct broker list
-      // URL: kafka://<kafka host>/<topic>
-      // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
-      String brokerList = uri.getAuthority();
-      props.setProperty("metadata.broker.list", brokerList);
-      job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList);
-
-      if (uri.getPath() == null || uri.getPath().length() <= 1)
-        throw new KafkaException("no topic specified in kafka uri");
-
-      topic = uri.getPath().substring(1);               // ignore the initial '/' in the path
-      job.set(KAFKA_CONFIG_PREFIX + ".topic", topic);
-      log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
-    } else
-      throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
-
-    Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props));
-    return new KafkaRecordWriter<K, V>(producer, topic, queueBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
deleted file mode 100644
index 72c088d..0000000
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.hadoop;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
-{
-  protected Producer<Object, byte[]> producer;
-  protected String topic;
-
-  protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
-  protected int totalBytes = 0;
-  protected int queueBytes;
-
-  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueBytes)
-  {
-    this.producer = producer;
-    this.topic = topic;
-    this.queueBytes = queueBytes;
-  }
-
-  protected void sendMsgList() throws IOException
-  {
-    if (msgList.size() > 0) {
-      try {
-        producer.send(msgList);
-      }
-      catch (Exception e) {
-        throw new IOException(e);           // all Kafka exceptions become IOExceptions
-      }
-      msgList.clear();
-      totalBytes = 0;
-    }
-  }
-
-  @Override
-  public void write(K key, V value) throws IOException, InterruptedException
-  {
-    byte[] valBytes;
-    if (value instanceof byte[])
-      valBytes = (byte[]) value;
-    else if (value instanceof BytesWritable)
-      // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the
-      // intended size of the byte array contained.  We need to use BytesWritable.getLength for the true size.
-      valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength());
-    else
-      throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
-
-    // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
-    // If the new message is going to make the message list tip over 1 million bytes, send the
-    // message list now.
-    if ((totalBytes + valBytes.length) > queueBytes || msgList.size() >= Short.MAX_VALUE)
-      sendMsgList();
-
-    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
-    totalBytes += valBytes.length;
-  }
-
-  @Override
-  public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
-  {
-    sendMsgList();
-    producer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
deleted file mode 100644
index d24a85a..0000000
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.bridge.pig;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import kafka.bridge.hadoop.KafkaOutputFormat;
-import kafka.bridge.hadoop.KafkaRecordWriter;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Encoder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
-import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-
-public class AvroKafkaStorage extends StoreFunc
-{
-  protected KafkaRecordWriter<Object, byte[]> writer;
-  protected org.apache.avro.Schema avroSchema;
-  protected PigAvroDatumWriter datumWriter;
-  protected Encoder encoder;
-  protected ByteArrayOutputStream os;
-
-  public AvroKafkaStorage(String schema)
-  {
-    this.avroSchema = org.apache.avro.Schema.parse(schema);
-  }
-
-  @Override
-  public OutputFormat getOutputFormat() throws IOException
-  {
-    return new KafkaOutputFormat();
-  }
-
-  @Override
-  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-  {
-    return location;
-  }
-
-  @Override
-  public void setStoreLocation(String uri, Job job) throws IOException
-  {
-    KafkaOutputFormat.setOutputPath(job, new Path(uri));
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void prepareToWrite(RecordWriter writer) throws IOException
-  {
-    if (this.avroSchema == null)
-      throw new IllegalStateException("avroSchema shouldn't be null");
-
-    this.writer = (KafkaRecordWriter) writer;
-    this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
-    this.os = new ByteArrayOutputStream();
-    this.encoder = new BinaryEncoder(this.os);
-  }
-
-  @Override
-  public void cleanupOnFailure(String location, Job job) throws IOException
-  {
-  }
-
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature)
-  {
-  }
-
-  @Override
-  public void checkSchema(ResourceSchema schema) throws IOException
-  {
-    this.avroSchema = PigSchema2Avro.validateAndConvert(avroSchema, schema);
-  }
-
-  protected void writeEnvelope(OutputStream os, Encoder enc) throws IOException
-  {
-  }
-
-  @Override
-  public void putNext(Tuple tuple) throws IOException
-  {
-    os.reset();
-    writeEnvelope(os, this.encoder);
-    datumWriter.write(tuple, this.encoder);
-    this.encoder.flush();
-
-    try {
-      this.writer.write(null, this.os.toByteArray());
-    }
-    catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/docs/api.html
----------------------------------------------------------------------
diff --git a/docs/api.html b/docs/api.html
index 1787f06..835bdf2 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
     http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +28,7 @@ As of the 0.8.2 release we encourage all new development to use the new Java pro
 	&lt;/dependency&gt;
 </pre>
 
-Examples showing how to use the producer are given in the 
+Examples showing how to use the producer are given in the
 <a href="http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html" title="Kafka 0.8.2 Javadoc">javadocs</a>.
 
 <p>
@@ -144,17 +144,7 @@ class kafka.javaapi.consumer.SimpleConsumer {
 For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in
 <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example" title="Kafka 0.8 SimpleConsumer example">here</a>.
 
-<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3>
-<p>
-Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
-</p>
-
-<p>
-Usage information on the hadoop consumer can be found <a href="https://github.com/linkedin/camus/">here</a>.
-</p>
-
-
-<h3><a id="newconsumerapi">2.5 New Consumer API</a></h3>
+<h3><a id="newconsumerapi">2.4 New Consumer API</a></h3>
 As of the 0.9.0 release we have added a replacement for our existing simple and high-level consumers. This client is considered beta quality. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
 <pre>
 	&lt;dependency&gt;
@@ -165,4 +155,4 @@ As of the 0.9.0 release we have added a replacement for our existing simple and
 </pre>
 
 Examples showing how to use the producer are given in the
-<a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html" title="Kafka 0.9.0 Javadoc">javadocs</a>.
\ No newline at end of file
+<a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html" title="Kafka 0.9.0 Javadoc">javadocs</a>.

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/docs/documentation.html
----------------------------------------------------------------------
diff --git a/docs/documentation.html b/docs/documentation.html
index 795bd1e..a123721 100644
--- a/docs/documentation.html
+++ b/docs/documentation.html
@@ -35,7 +35,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
               <li><a href="#producerapi">2.1 Producer API</a>
               <li><a href="#highlevelconsumerapi">2.2 High Level Consumer API</a>
               <li><a href="#simpleconsumerapi">2.3 Simple Consumer API</a>
-              <li><a href="#kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a>
+              <li><a href="#newconsumerapi">2.4 New Consumer API</a>
           </ul>
     <li><a href="#configuration">3. Configuration</a>
         <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/docs/implementation.html
----------------------------------------------------------------------
diff --git a/docs/implementation.html b/docs/implementation.html
index d9ffa46..b95d36f 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -5,9 +5,9 @@
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at
- 
+
     http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
 <h4>Producer APIs</h4>
 
 <p>
-The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. 
+The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>.
 <pre>
 class Producer<T> {
-	
+
   /* Sends the data, partitioned by key to the topic using either the */
   /* synchronous or the asynchronous producer */
   public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; producerData);
@@ -32,21 +32,21 @@ class Producer<T> {
   /* the synchronous or the asynchronous producer */
   public void send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt; producerData);
 
-  /* Closes the producer and cleans up */	
+  /* Closes the producer and cleans up */
   public void close();
 
 }
 </pre>
 
-The goal is to expose all the producer functionality through a single API to the client.  
+The goal is to expose all the producer functionality through a single API to the client.
 
 The new producer -
 <ul>
-<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - 	
+<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data -
 <p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</c
 ode> interface and setting <code>callback.handler</code> config parameter to that class.
 </p>
 </li>
-<li>handles the serialization of data through a user-specified <code>Encoder</code> - 
+<li>handles the serialization of data through a user-specified <code>Encoder</code> -
 <pre>
 interface Encoder&lt;T&gt; {
   public Message toMessage(T data);
@@ -54,15 +54,15 @@ interface Encoder&lt;T&gt; {
 </pre>
 <p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
 </li>
-<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - 
+<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> -
 <p>
-The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. 
+The routing decision is influenced by the <code>kafka.producer.Partitioner</code>.
 <pre>
 interface Partitioner&lt;T&gt; {
    int partition(T key, int numPartitions);
 }
 </pre>
-The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.	
+The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.
 </p>
 </li>
 </ul>
@@ -79,11 +79,11 @@ The high-level API hides the details of brokers from the consumer and allows con
 <h5>Low-level API</h5>
 <pre>
 class SimpleConsumer {
-	
-  /* Send fetch request to a broker and get back a set of messages. */ 
+
+  /* Send fetch request to a broker and get back a set of messages. */
   public ByteBufferMessageSet fetch(FetchRequest request);
 
-  /* Send a list of fetch requests to a broker and get back a response set. */ 
+  /* Send a list of fetch requests to a broker and get back a response set. */
   public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);
 
   /**
@@ -97,16 +97,16 @@ class SimpleConsumer {
 }
 </pre>
 
-The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.
+The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.
 
 <h5>High-level API</h5>
 <pre>
 
-/* create a connection to the cluster */ 
+/* create a connection to the cluster */
 ConsumerConnector connector = Consumer.create(consumerConfig);
 
 interface ConsumerConnector {
-	
+
   /**
    * This method is used to get a list of KafkaStreams, which are iterators over
    * MessageAndMetadata objects from which you can obtain messages and their
@@ -114,7 +114,7 @@ interface ConsumerConnector {
    *  Input: a map of &lt;topic, #streams&gt;
    *  Output: a map of &lt;topic, list of message streams&gt;
    */
-  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap);
 
   /**
    * You can also obtain a list of KafkaStreams, that iterate over messages
@@ -126,7 +126,7 @@ interface ConsumerConnector {
 
   /* Commit the offsets of all messages consumed so far. */
   public commitOffsets()
-  
+
   /* Shut down the connector */
   public shutdown()
 }
@@ -149,27 +149,27 @@ Messages consist of a fixed-size header and variable length opaque byte array pa
 <h3><a id="messageformat">5.4 Message Format</a></h3>
 
 <pre>
-	/** 
-	 * A message. The format of an N byte message is the following: 
-	 * 
-	 * If magic byte is 0 
-	 * 
-	 * 1. 1 byte "magic" identifier to allow format changes 
-	 * 
-	 * 2. 4 byte CRC32 of the payload 
-	 * 
-	 * 3. N - 5 byte payload 
-	 * 
-	 * If magic byte is 1 
-	 * 
-	 * 1. 1 byte "magic" identifier to allow format changes 
-	 * 
-	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
-	 * 
-	 * 3. 4 byte CRC32 of the payload 
-	 * 
-	 * 4. N - 6 byte payload 
-	 * 
+	/**
+	 * A message. The format of an N byte message is the following:
+	 *
+	 * If magic byte is 0
+	 *
+	 * 1. 1 byte "magic" identifier to allow format changes
+	 *
+	 * 2. 4 byte CRC32 of the payload
+	 *
+	 * 3. N - 5 byte payload
+	 *
+	 * If magic byte is 1
+	 *
+	 * 1. 1 byte "magic" identifier to allow format changes
+	 *
+	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+	 *
+	 * 3. 4 byte CRC32 of the payload
+	 *
+	 * 4. N - 6 byte payload
+	 *
 	 */
 </pre>
 </p>
@@ -183,7 +183,7 @@ The exact binary format for messages is versioned and maintained as a standard i
 <pre>
 On-disk format of a message
 
-message length : 4 bytes (value: 1+4+n) 
+message length : 4 bytes (value: 1+4+n)
 "magic" value  : 1 byte
 crc            : 4 bytes
 payload        : n bytes
@@ -289,7 +289,7 @@ When an element in a path is denoted [xyz], that means that the value of xyz is
 This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
 </p>
 <p>
-Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).	
+Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
 </p>
 <h4>Broker Topic Registry</h4>
 <pre>
@@ -306,7 +306,7 @@ Consumers of topics also register themselves in ZooKeeper, in order to coordinat
 </p>
 
 <p>
-Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. 
+Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id.
 For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
 </p>
 
@@ -371,7 +371,7 @@ The consumer rebalancing algorithms allows all the consumers in a group to come
 Each consumer does the following during rebalancing:
 </p>
 <pre>
-   1. For each topic T that C<sub>i</sub> subscribes to 
+   1. For each topic T that C<sub>i</sub> subscribes to
    2.   let P<sub>T</sub> be all partitions producing topic T
    3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
    4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0875611..3d69fac 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
\ No newline at end of file
+include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'


[2/2] kafka git commit: KAFKA-2783; Drop outdated hadoop contrib modules

Posted by gw...@apache.org.
KAFKA-2783; Drop outdated hadoop contrib modules

Author: Grant Henke <gr...@gmail.com>

Reviewers: Gwen Shapira

Closes #466 from granthenke/drop-contrib


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69af573b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69af573b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69af573b

Branch: refs/heads/trunk
Commit: 69af573b35f04657e31f60e636aba19ffa0b2c84
Parents: 7073fa7
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Nov 9 11:02:46 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Nov 9 11:02:46 2015 -0800

----------------------------------------------------------------------
 README.md                                       |   2 +-
 build.gradle                                    |  51 +--
 contrib/LICENSE                                 |   1 -
 contrib/NOTICE                                  |   1 -
 contrib/hadoop-consumer/README                  |  66 ---
 contrib/hadoop-consumer/copy-jars.sh            |  69 ---
 contrib/hadoop-consumer/hadoop-setup.sh         |  20 -
 contrib/hadoop-consumer/run-class.sh            |  65 ---
 .../main/java/kafka/etl/KafkaETLContext.java    | 270 -----------
 .../java/kafka/etl/KafkaETLInputFormat.java     |  78 ----
 .../src/main/java/kafka/etl/KafkaETLJob.java    | 172 -------
 .../src/main/java/kafka/etl/KafkaETLKey.java    | 104 -----
 .../java/kafka/etl/KafkaETLRecordReader.java    | 180 --------
 .../main/java/kafka/etl/KafkaETLRequest.java    | 129 ------
 .../src/main/java/kafka/etl/KafkaETLUtils.java  | 205 ---------
 .../src/main/java/kafka/etl/Props.java          | 458 -------------------
 .../kafka/etl/UndefinedPropertyException.java   |  28 --
 .../main/java/kafka/etl/impl/DataGenerator.java | 134 ------
 .../java/kafka/etl/impl/SimpleKafkaETLJob.java  | 104 -----
 .../kafka/etl/impl/SimpleKafkaETLMapper.java    |  91 ----
 contrib/hadoop-consumer/test/test.properties    |  42 --
 contrib/hadoop-producer/README.md               |  94 ----
 .../kafka/bridge/examples/TextPublisher.java    |  66 ---
 .../kafka/bridge/hadoop/KafkaOutputFormat.java  | 144 ------
 .../kafka/bridge/hadoop/KafkaRecordWriter.java  |  88 ----
 .../java/kafka/bridge/pig/AvroKafkaStorage.java | 115 -----
 docs/api.html                                   |  20 +-
 docs/documentation.html                         |   2 +-
 docs/implementation.html                        |  88 ++--
 settings.gradle                                 |   4 +-
 30 files changed, 54 insertions(+), 2837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index dc15923..6baa17e 100644
--- a/README.md
+++ b/README.md
@@ -60,7 +60,7 @@ The release file can be found inside ./core/build/distributions/.
     ./gradlew -PscalaVersion=2.11.7 releaseTarGz
 
 ### Running a task for a specific project ###
-This is for 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples' and 'clients'
+This is for 'core', 'examples' and 'clients'
     ./gradlew core:jar
     ./gradlew core:test
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4ea0ee3..f9fd42a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -229,7 +229,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
 tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { }
@@ -376,55 +376,6 @@ project(':core') {
   }
 }
 
-project(':contrib:hadoop-consumer') {
-  archivesBaseName = "kafka-hadoop-consumer"
-
-  dependencies {
-    compile project(':core')
-    compile "org.apache.avro:avro:1.4.0"
-    compile "org.apache.pig:pig:0.8.0"
-    compile "commons-logging:commons-logging:1.0.4"
-    compile "org.codehaus.jackson:jackson-core-asl:1.5.5"
-    compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5"
-    compile "org.apache.hadoop:hadoop-core:0.20.2"
-  }
-
-  configurations {
-    // manually excludes some unnecessary dependencies
-    compile.exclude module: 'javax'
-    compile.exclude module: 'jms'
-    compile.exclude module: 'jmxri'
-    compile.exclude module: 'jmxtools'
-    compile.exclude module: 'mail'
-    compile.exclude module: 'netty'
-  }
-}
-
-project(':contrib:hadoop-producer') {
-  archivesBaseName = "kafka-hadoop-producer"
-
-  dependencies {
-    compile project(':core')
-    compile("org.apache.avro:avro:1.4.0") { force = true }
-    compile "org.apache.pig:pig:0.8.0"
-    compile "commons-logging:commons-logging:1.0.4"
-    compile "org.codehaus.jackson:jackson-core-asl:1.5.5"
-    compile "org.codehaus.jackson:jackson-mapper-asl:1.5.5"
-    compile "org.apache.hadoop:hadoop-core:0.20.2"
-    compile "org.apache.pig:piggybank:0.12.0"
-  }
-
-  configurations {
-    // manually excludes some unnecessary dependencies
-    compile.exclude module: 'javax'
-    compile.exclude module: 'jms'
-    compile.exclude module: 'jmxri'
-    compile.exclude module: 'jmxtools'
-    compile.exclude module: 'mail'
-    compile.exclude module: 'netty'
-  }
-}
-
 project(':examples') {
   archivesBaseName = "kafka-examples"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/LICENSE
----------------------------------------------------------------------
diff --git a/contrib/LICENSE b/contrib/LICENSE
deleted file mode 120000
index ea5b606..0000000
--- a/contrib/LICENSE
+++ /dev/null
@@ -1 +0,0 @@
-../LICENSE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/NOTICE
----------------------------------------------------------------------
diff --git a/contrib/NOTICE b/contrib/NOTICE
deleted file mode 120000
index 7e1b82f..0000000
--- a/contrib/NOTICE
+++ /dev/null
@@ -1 +0,0 @@
-../NOTICE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/README
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/README b/contrib/hadoop-consumer/README
deleted file mode 100644
index 54a2666..0000000
--- a/contrib/hadoop-consumer/README
+++ /dev/null
@@ -1,66 +0,0 @@
-This is a Hadoop job that pulls data from kafka server into HDFS.
-
-It requires the following inputs from a configuration file 
-(test/test.properties is an example)
-
-kafka.etl.topic : the topic to be fetched;
-
-input		: input directory containing topic offsets and
-		  it can be generated by DataGenerator; 
-		  the number of files in this directory determines the
-		  number of mappers in the hadoop job;
-
-output		: output directory containing kafka data and updated 
-		  topic offsets;
-
-kafka.request.limit : it is used to limit the number events fetched. 
-
-KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
-It fetches kafka data from the server. It starts from provided offsets 
-(specified by "input") and stops when it reaches the largest available offsets 
-or the specified limit (specified by "kafka.request.limit").
-
-KafkaETLJob contains some helper functions to initialize job configuration.
-
-SimpleKafkaETLJob sets up job properties and files Hadoop job. 
-
-SimpleKafkaETLMapper dumps kafka data into hdfs. 
-
-HOW TO RUN:
-In order to run this, make sure the HADOOP_HOME environment variable points to 
-your hadoop installation directory.
-
-1. Compile using "sbt" to create a package for hadoop consumer code.
-./sbt package
-
-2. Run the hadoop-setup.sh script that enables write permission on the 
-   required HDFS directory
-
-3. Produce test events in server and generate offset files
-  1) Start kafka server [ Follow the quick start - 
-                        http://sna-projects.com/kafka/quickstart.php ]
-
-  2) Update test/test.properties to change the following parameters:  
-   kafka.etl.topic 	: topic name
-   event.count		: number of events to be generated
-   kafka.server.uri     : kafka server uri;
-   input                : hdfs directory of offset files
-
-  3) Produce test events to Kafka server and generate offset files
-   ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
-
-4. Fetch generated topic into HDFS:
-  1) Update test/test.properties to change the following parameters:
-	hadoop.job.ugi	: id and group 
-	input           : input location 
-	output	        : output location 
-	kafka.request.limit: limit the number of events to be fetched; 
-			     -1 means no limitation.
-        hdfs.default.classpath.dir : hdfs location of jars
-
-  2) copy jars into hdfs
-   ./copy-jars.sh ${hdfs.default.classpath.dir}
-
-  2) Fetch data
-  ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/copy-jars.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/copy-jars.sh b/contrib/hadoop-consumer/copy-jars.sh
deleted file mode 100755
index e5de1dd..0000000
--- a/contrib/hadoop-consumer/copy-jars.sh
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ $# -lt 1 ];
-then
-  echo "USAGE: $0 dir"
-  exit 1
-fi
-
-base_dir=$(dirname $0)/../..
-
-hadoop=${HADOOP_HOME}/bin/hadoop
-
-echo "$hadoop fs -rmr $1"
-$hadoop fs -rmr $1
-
-echo "$hadoop fs -mkdir $1"
-$hadoop fs -mkdir $1
-
-# include kafka jars
-for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-# include kafka jars
-echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/"
-$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/ 
-
-# include core lib jars
-for file in $base_dir/core/lib/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-
-# include scala library jar
-echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/"
-$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/
-
-local_dir=$(dirname $0)
-
-# include hadoop-consumer jars
-for file in $local_dir/lib/*.jar;
-do
-   echo "$hadoop fs -put $file $1/"
-   $hadoop fs -put $file $1/ 
-done
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/hadoop-setup.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/hadoop-setup.sh b/contrib/hadoop-consumer/hadoop-setup.sh
deleted file mode 100755
index c855e66..0000000
--- a/contrib/hadoop-consumer/hadoop-setup.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-hadoop=${HADOOP_HOME}/bin/hadoop
-
-$hadoop fs -chmod ugoa+w /tmp
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/run-class.sh
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/run-class.sh b/contrib/hadoop-consumer/run-class.sh
deleted file mode 100755
index bfb4744..0000000
--- a/contrib/hadoop-consumer/run-class.sh
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ $# -lt 1 ];
-then
-  echo "USAGE: $0 classname [opts]"
-  exit 1
-fi
-
-base_dir=$(dirname $0)/../..
-
-# include kafka jars
-for file in $base_dir/core/target/scala_2.8.0/kafka-*.jar
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $base_dir/contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-local_dir=$(dirname $0)
-
-# include hadoop-consumer jars
-for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-for file in $base_dir/contrib/hadoop-consumer/lib/*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
-CLASSPATH=$CLASSPATH:$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar
-
-echo $CLASSPATH
-
-CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf
-
-#if [ -z "$KAFKA_OPTS" ]; then
-#  KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
-#fi
-
-if [ -z "$JAVA_HOME" ]; then
-  JAVA="java"
-else
-  JAVA="$JAVA_HOME/bin/java"
-fi
-
-$JAVA $KAFKA_OPTS -cp $CLASSPATH $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
deleted file mode 100644
index c9b9018..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings({ "deprecation"})
-public class KafkaETLContext {
-    
-    static protected int MAX_RETRY_TIME = 1;
-    final static String CLIENT_BUFFER_SIZE = "client.buffer.size";
-    final static String CLIENT_TIMEOUT = "client.so.timeout";
-
-    final static int DEFAULT_BUFFER_SIZE = 1 * 1024 * 1024;
-    final static int DEFAULT_TIMEOUT = 60000; // one minute
-
-    final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
-
-    protected int _index; /*index of context*/
-    protected String _input = null; /*input string*/
-    protected KafkaETLRequest _request = null;
-    protected SimpleConsumer _consumer = null; /*simple consumer*/
-
-    protected long[] _offsetRange = {0, 0};  /*offset range*/
-    protected long _offset = Long.MAX_VALUE; /*current offset*/
-    protected long _count; /*current count*/
-
-    protected FetchResponse _response = null;  /*fetch response*/
-    protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
-    protected Iterator<ByteBufferMessageSet> _respIterator = null;
-    protected int _retry = 0;
-    protected long _requestTime = 0; /*accumulative request time*/
-    protected long _startTime = -1;
-    
-    protected int _bufferSize;
-    protected int _timeout;
-    protected Reporter _reporter;
-    
-    protected MultipleOutputs _mos;
-    protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
-    protected FetchRequestBuilder builder = new FetchRequestBuilder();
-    
-    public long getTotalBytes() {
-        return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0;
-    }
-    
-    public long getReadBytes() {
-        return _offset - _offsetRange[0];
-    }
-    
-    public long getCount() {
-        return _count;
-    }
-    
-    /**
-     * construct using input string
-     */
-    @SuppressWarnings("unchecked")
-    public KafkaETLContext(JobConf job, Props props, Reporter reporter, 
-                                    MultipleOutputs mos, int index, String input) 
-    throws Exception {
-        
-        _bufferSize = getClientBufferSize(props);
-        _timeout = getClientTimeout(props);
-        System.out.println("bufferSize=" +_bufferSize);
-        System.out.println("timeout=" + _timeout);
-        _reporter = reporter;
-        _mos = mos;
-        
-        // read topic and current offset from input
-        _index= index; 
-        _input = input;
-        _request = new KafkaETLRequest(input.trim());
-        
-        // read data from queue
-        URI uri = _request.getURI();
-        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext");
-        
-        // get available offset range
-        _offsetRange = getOffsetRange();
-        System.out.println("Connected to node " + uri 
-                + " beginning reading at offset " + _offsetRange[0]
-                + " latest offset=" + _offsetRange[1]);
-
-        _offset = _offsetRange[0];
-        _count = 0;
-        _requestTime = 0;
-        _retry = 0;
-        
-        _startTime = System.currentTimeMillis();
-    }
-    
-    public boolean hasMore () {
-        return _messageIt != null && _messageIt.hasNext() 
-                || _response != null && _respIterator.hasNext()
-                || _offset < _offsetRange[1]; 
-    }
-    
-    public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
-        if ( !hasMore() ) return false;
-        
-        boolean gotNext = get(key, value);
-
-        if(_response != null) {
-
-            while ( !gotNext && _respIterator.hasNext()) {
-                ByteBufferMessageSet msgSet = _respIterator.next();
-                _messageIt = msgSet.iterator();
-                gotNext = get(key, value);
-            }
-        }
-        return gotNext;
-    }
-    
-    public boolean fetchMore () throws IOException {
-        if (!hasMore()) return false;
-
-        FetchRequest fetchRequest = builder
-                .clientId(_request.clientId())
-                .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
-                .build();
-
-        long tempTime = System.currentTimeMillis();
-        _response = _consumer.fetch(fetchRequest);
-        if(_response != null) {
-            _respIterator = new ArrayList<ByteBufferMessageSet>(){{
-                add(_response.messageSet(_request.getTopic(), _request.getPartition()));
-            }}.iterator();
-        }
-        _requestTime += (System.currentTimeMillis() - tempTime);
-        
-        return true;
-    }
-    
-    @SuppressWarnings("unchecked")
-    public void output(String fileprefix) throws IOException {
-       String offsetString = _request.toString(_offset);
-
-        if (_offsetOut == null)
-            _offsetOut = (OutputCollector<KafkaETLKey, BytesWritable>)
-                                    _mos.getCollector("offsets", fileprefix+_index, _reporter);
-        _offsetOut.collect(DUMMY_KEY, new BytesWritable(offsetString.getBytes("UTF-8")));
-        
-    }
-    
-    public void close() throws IOException {
-        if (_consumer != null) _consumer.close();
-        
-        String topic = _request.getTopic();
-        long endTime = System.currentTimeMillis();
-        _reporter.incrCounter(topic, "read-time(ms)", endTime - _startTime);
-        _reporter.incrCounter(topic, "request-time(ms)", _requestTime);
-        
-        long bytesRead = _offset - _offsetRange[0];
-        double megaRead = bytesRead / (1024.0*1024.0);
-        _reporter.incrCounter(topic, "data-read(mb)", (long) megaRead);
-        _reporter.incrCounter(topic, "event-count", _count);
-    }
-    
-    protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
-        if (_messageIt != null && _messageIt.hasNext()) {
-            MessageAndOffset messageAndOffset = _messageIt.next();
-            
-            ByteBuffer buf = messageAndOffset.message().buffer();
-            int origSize = buf.remaining();
-            byte[] bytes = new byte[origSize];
-          buf.get(bytes, buf.position(), origSize);
-            value.set(bytes, 0, origSize);
-            
-            key.set(_index, _offset, messageAndOffset.message().checksum());
-            
-            _offset = messageAndOffset.nextOffset();  //increase offset
-            _count ++;  //increase count
-            
-            return true;
-        }
-        else return false;
-    }
-    
-    /**
-     * Get offset ranges
-     */
-    protected long[] getOffsetRange() throws IOException {
-
-        /* get smallest and largest offsets*/
-        long[] range = new long[2];
-
-        TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition());
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
-                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
-        OffsetRequest request = new OffsetRequest(
-            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
-        long[] startOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
-        if (startOffsets.length != 1)
-            throw new IOException("input:" + _input + " Expect one smallest offset but get "
-                                            + startOffsets.length);
-        range[0] = startOffsets[0];
-        
-        requestInfo.clear();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
-        request = new OffsetRequest(
-            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
-        long[] endOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
-        if (endOffsets.length != 1)
-            throw new IOException("input:" + _input + " Expect one latest offset but get " 
-                                            + endOffsets.length);
-        range[1] = endOffsets[0];
-
-        /*adjust range based on input offsets*/
-        if ( _request.isValidOffset()) {
-            long startOffset = _request.getOffset();
-            if (startOffset > range[0]) {
-                System.out.println("Update starting offset with " + startOffset);
-                range[0] = startOffset;
-            }
-            else {
-                System.out.println("WARNING: given starting offset " + startOffset 
-                                            + " is smaller than the smallest one " + range[0] 
-                                            + ". Will ignore it.");
-            }
-        }
-        System.out.println("Using offset range [" + range[0] + ", " + range[1] + "]");
-        return range;
-    }
-    
-    public static int getClientBufferSize(Props props) throws Exception {
-        return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
-    }
-
-    public static int getClientTimeout(Props props) throws Exception {
-        return props.getInt(CLIENT_TIMEOUT, DEFAULT_TIMEOUT);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
deleted file mode 100644
index ddd6b72..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-import kafka.consumer.SimpleConsumer;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-
-@SuppressWarnings("deprecation")
-public class KafkaETLInputFormat 
-extends SequenceFileInputFormat<KafkaETLKey, BytesWritable> {
-
-    protected Props _props;
-    protected int _bufferSize;
-    protected int _soTimeout;
-
-    protected Map<Integer, URI> _nodes;
-    protected int _partition;
-    protected int _nodeId;
-    protected String _topic;
-    protected SimpleConsumer _consumer;
-
-    protected MultipleOutputs _mos;
-    protected OutputCollector<BytesWritable, BytesWritable> _offsetOut = null;
-
-    protected long[] _offsetRange;
-    protected long _startOffset;
-    protected long _offset;
-    protected boolean _toContinue = true;
-    protected int _retry;
-    protected long _timestamp;
-    protected long _count;
-    protected boolean _ignoreErrors = false;
-
-    @Override
-    public RecordReader<KafkaETLKey, BytesWritable> getRecordReader(InputSplit split,
-                                    JobConf job, Reporter reporter)
-                                    throws IOException {
-        return new KafkaETLRecordReader(split, job, reporter);
-    }
-
-    @Override
-    protected boolean isSplitable(FileSystem fs, Path file) {
-        return super.isSplitable(fs, file);
-    }
-
-    @Override
-    public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
-        return super.getSplits(conf, numSplits);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
deleted file mode 100644
index 1a4bcba..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.net.URI;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings("deprecation")
-public class KafkaETLJob {
-    
-    public static final String HADOOP_PREFIX = "hadoop-conf.";
-    /**
-     * Create a job configuration
-     */
-    @SuppressWarnings("rawtypes")
-    public static JobConf createJobConf(String name, String topic, Props props, Class classobj) 
-    throws Exception {
-        JobConf conf = getJobConf(name, props, classobj);
-        
-        conf.set("topic", topic);
-        
-        // input format
-        conf.setInputFormat(KafkaETLInputFormat.class);
-
-        //turn off mapper speculative execution
-        conf.setMapSpeculativeExecution(false);
-        
-        // setup multiple outputs
-        MultipleOutputs.addMultiNamedOutput(conf, "offsets", SequenceFileOutputFormat.class, 
-                    KafkaETLKey.class, BytesWritable.class);
-
-
-        return conf;
-    }
-    
-    /**
-     * Helper function to initialize a job configuration
-     */
-    public static JobConf getJobConf(String name, Props props, Class classobj) throws Exception {
-        JobConf conf = new JobConf();
-        // set custom class loader with custom find resource strategy.
-
-        conf.setJobName(name);
-        String hadoop_ugi = props.getProperty("hadoop.job.ugi", null);
-        if (hadoop_ugi != null) {
-            conf.set("hadoop.job.ugi", hadoop_ugi);
-        }
-
-        if (props.getBoolean("is.local", false)) {
-            conf.set("mapred.job.tracker", "local");
-            conf.set("fs.default.name", "file:///");
-            conf.set("mapred.local.dir", "/tmp/map-red");
-
-            info("Running locally, no hadoop jar set.");
-        } else {
-            setClassLoaderAndJar(conf, classobj);
-            info("Setting hadoop jar file for class:" + classobj + "  to " + conf.getJar());
-            info("*************************************************************************");
-            info("          Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + ")           ");
-            info("*************************************************************************");
-        }
-
-        // set JVM options if present
-        if (props.containsKey("mapred.child.java.opts")) {
-            conf.set("mapred.child.java.opts", props.getProperty("mapred.child.java.opts"));
-            info("mapred.child.java.opts set to " + props.getProperty("mapred.child.java.opts"));
-        }
-
-        // Adds External jars to hadoop classpath
-        String externalJarList = props.getProperty("hadoop.external.jarFiles", null);
-        if (externalJarList != null) {
-            String[] jarFiles = externalJarList.split(",");
-            for (String jarFile : jarFiles) {
-                info("Adding extenral jar File:" + jarFile);
-                DistributedCache.addFileToClassPath(new Path(jarFile), conf);
-            }
-        }
-
-        // Adds distributed cache files
-        String cacheFileList = props.getProperty("hadoop.cache.files", null);
-        if (cacheFileList != null) {
-            String[] cacheFiles = cacheFileList.split(",");
-            for (String cacheFile : cacheFiles) {
-                info("Adding Distributed Cache File:" + cacheFile);
-                DistributedCache.addCacheFile(new URI(cacheFile), conf);
-            }
-        }
-
-        // Adds distributed cache files
-        String archiveFileList = props.getProperty("hadoop.cache.archives", null);
-        if (archiveFileList != null) {
-            String[] archiveFiles = archiveFileList.split(",");
-            for (String archiveFile : archiveFiles) {
-                info("Adding Distributed Cache Archive File:" + archiveFile);
-                DistributedCache.addCacheArchive(new URI(archiveFile), conf);
-            }
-        }
-
-        String hadoopCacheJarDir = props.getProperty("hdfs.default.classpath.dir", null);
-        if (hadoopCacheJarDir != null) {
-            FileSystem fs = FileSystem.get(conf);
-            if (fs != null) {
-                FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir));
-
-                if (status != null) {
-                    for (int i = 0; i < status.length; ++i) {
-                        if (!status[i].isDir()) {
-                            Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName());
-                            info("Adding Jar to Distributed Cache Archive File:" + path);
-
-                            DistributedCache.addFileToClassPath(path, conf);
-                        }
-                    }
-                } else {
-                    info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty.");
-                }
-            } else {
-                info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " filesystem doesn't exist");
-            }
-        }
-
-        // May want to add this to HadoopUtils, but will await refactoring
-        for (String key : props.stringPropertyNames()) {
-            String lowerCase = key.toLowerCase();
-            if (lowerCase.startsWith(HADOOP_PREFIX)) {
-                String newKey = key.substring(HADOOP_PREFIX.length());
-                conf.set(newKey, props.getProperty(key));
-            }
-        }
-
-        KafkaETLUtils.setPropsInJob(conf, props);
-        
-        return conf;
-    }
-
-    public static void info(String message) {
-        System.out.println(message);
-    }
-
-    public static void setClassLoaderAndJar(JobConf conf,
-            @SuppressWarnings("rawtypes") Class jobClass) {
-        conf.setClassLoader(Thread.currentThread().getContextClassLoader());
-        String jar = KafkaETLUtils.findContainingJar(jobClass, Thread
-                .currentThread().getContextClassLoader());
-        if (jar != null) {
-            conf.setJar(jar);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
deleted file mode 100644
index aafecea..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.WritableComparable;
-
-public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
-
-    protected int _inputIndex;
-    protected long _offset;
-    protected long _checksum;
-    
-    /**
-     * dummy empty constructor
-     */
-    public KafkaETLKey() {
-        _inputIndex = 0;
-        _offset = 0;
-        _checksum = 0;
-    }
-    
-    public KafkaETLKey (int index, long offset) {
-        _inputIndex =  index;
-        _offset = offset;
-        _checksum = 0;
-    }
-    
-    public KafkaETLKey (int index, long offset, long checksum) {
-        _inputIndex =  index;
-        _offset = offset;
-        _checksum = checksum;
-    }
-    
-    public void set(int index, long offset, long checksum) {
-        _inputIndex = index;
-        _offset = offset;
-        _checksum = checksum;
-    }
-    
-    public int getIndex() {
-        return _inputIndex;
-    }
-    
-    public long getOffset() {
-        return _offset;
-    }
-    
-    public long getChecksum() {
-        return _checksum;
-    }
-    
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        _inputIndex = in.readInt(); 
-        _offset = in.readLong();
-        _checksum = in.readLong();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(_inputIndex);
-        out.writeLong(_offset);
-        out.writeLong(_checksum);
-    }
-
-    @Override
-    public int compareTo(KafkaETLKey o) {
-        if (_inputIndex != o._inputIndex)
-            return _inputIndex = o._inputIndex;
-        else {
-            if  (_offset > o._offset) return 1;
-            else if (_offset < o._offset) return -1;
-            else {
-                if  (_checksum > o._checksum) return 1;
-                else if (_checksum < o._checksum) return -1;
-                else return 0;
-            }
-        }
-    }
-    
-    @Override
-    public String toString() {
-        return "index=" + _inputIndex + " offset=" + _offset + " checksum=" + _checksum;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
deleted file mode 100644
index f040fbe..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.common.KafkaException;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-
-@SuppressWarnings({ "deprecation" })
-public class KafkaETLRecordReader 
-extends SequenceFileRecordReader<KafkaETLKey, BytesWritable> {
-
-    /* max number of retries */
-    protected Props _props;   /*properties*/
-    protected JobConf _job;
-    protected Reporter _reporter ;
-    protected MultipleOutputs _mos;
-    protected List<KafkaETLContext> _contextList;
-    protected int _contextIndex ;
-    
-    protected long _totalBytes;
-    protected long _readBytes;
-    protected long _readCounts;
-    
-    protected String _attemptId = null;
-    
-    private static long _limit = 100; /*for testing only*/
-    
-    public KafkaETLRecordReader(InputSplit split, JobConf job, Reporter reporter) 
-    throws IOException {
-       super(job, (FileSplit) split);
-       
-       _props = KafkaETLUtils.getPropsFromJob(job);
-       _contextList = new ArrayList<KafkaETLContext>();
-       _job = job;
-       _reporter = reporter;
-       _contextIndex = -1;
-       _mos = new MultipleOutputs(job);
-       try {
-           _limit = _props.getInt("kafka.request.limit", -1);
-           
-           /*get attemp id*/
-           String taskId = _job.get("mapred.task.id");
-           if (taskId == null) {
-               throw new KafkaException("Configuration does not contain the property mapred.task.id");
-           }
-           String[] parts = taskId.split("_");
-           if (    parts.length != 6 || !parts[0].equals("attempt") 
-                || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
-                   throw new KafkaException("TaskAttemptId string : " + taskId + " is not properly formed");
-           }
-          _attemptId = parts[4]+parts[3];
-       }catch (Exception e) {
-           throw new IOException (e);
-       }
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-        super.close();
-        
-        /* now record some stats */
-        for (KafkaETLContext context: _contextList) {
-            context.output(_attemptId);
-            context.close();
-        }
-        
-        _mos.close();
-    }
-
-    @Override
-    public KafkaETLKey createKey() {
-        return super.createKey();
-    }
-
-    @Override
-    public BytesWritable createValue() {
-        return super.createValue();
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-        if (_totalBytes == 0) return 0f;
-        
-        if (_contextIndex >= _contextList.size()) return 1f;
-        
-        if (_limit < 0) {
-            double p = ( _readBytes + getContext().getReadBytes() ) / ((double) _totalBytes);
-            return (float)p;
-        }
-        else {
-            double p = (_readCounts + getContext().getCount()) / ((double)_limit * _contextList.size());
-            return (float)p;
-        }
-    }
-
-    @Override
-    public synchronized boolean next(KafkaETLKey key, BytesWritable value)
-                                    throws IOException {
-    try{
-        if (_contextIndex < 0) { /* first call, get all requests */
-            System.out.println("RecordReader.next init()");
-            _totalBytes = 0;
-            
-            while ( super.next(key, value)) {
-                String input = new String(value.getBytes(), "UTF-8");
-                int index = _contextList.size();
-                KafkaETLContext context = new KafkaETLContext(
-                                              _job, _props, _reporter, _mos, index, input);
-                _contextList.add(context);
-                _totalBytes += context.getTotalBytes();
-            }
-            System.out.println("Number of requests=" + _contextList.size());
-            
-            _readBytes = 0;
-            _readCounts = 0;
-            _contextIndex = 0;
-        }
-        
-        while (_contextIndex < _contextList.size()) {
-            
-            KafkaETLContext currContext = getContext();
-            
-            while (currContext.hasMore() && 
-                       (_limit < 0 || currContext.getCount() < _limit)) {
-                
-                if (currContext.getNext(key, value)) {
-                    //System.out.println("RecordReader.next get (key,value)");
-                    return true;
-                }
-                else {
-                    //System.out.println("RecordReader.next fetch more");
-                    currContext.fetchMore();
-                }
-            }
-            
-            _readBytes += currContext.getReadBytes();
-            _readCounts += currContext.getCount();
-            _contextIndex ++;
-            System.out.println("RecordReader.next will get from request " + _contextIndex);
-       }
-    }catch (Exception e) {
-        throw new IOException (e);
-    }
-    return false;
-    }
-    
-    protected KafkaETLContext getContext() throws IOException{
-        if (_contextIndex >= _contextList.size()) 
-            throw new IOException ("context index " + _contextIndex + " is out of bound " 
-                                            + _contextList.size());
-        return _contextList.get(_contextIndex);
-    }
-
-    
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
deleted file mode 100644
index 87df0ea..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-public class KafkaETLRequest {
-    public static long DEFAULT_OFFSET = -1;
-    public static String DELIM = "\t";
-    
-    String _topic;
-    URI _uri;
-    int _partition;
-    long _offset = DEFAULT_OFFSET;
-    String _clientId = "KafkaHadoopETL";
-    
-    public KafkaETLRequest() {
-        
-    }
-    
-    public KafkaETLRequest(String input) throws IOException {
-        //System.out.println("Init request from " + input);
-        String[] pieces = input.trim().split(DELIM);
-        if (pieces.length != 4)
-            throw new IOException( input + 
-                                            " : input must be in the form 'url" + DELIM +
-                                            "topic" + DELIM +"partition" + DELIM +"offset'");
-
-        try {
-            _uri = new URI (pieces[0]); 
-        }catch (java.net.URISyntaxException e) {
-            throw new IOException (e);
-        }
-        _topic = pieces[1];
-        _partition = Integer.valueOf(pieces[2]);
-        _offset = Long.valueOf(pieces[3]);
-    }
-    
-    public KafkaETLRequest(String node, String topic, String partition, String offset, 
-                                    Map<String, String> nodes) throws IOException {
-
-        Integer nodeId = Integer.parseInt(node);
-        String uri = nodes.get(nodeId.toString());
-        if (uri == null) throw new IOException ("Cannot form node for id " + nodeId);
-        
-        try {
-            _uri = new URI (uri); 
-        }catch (java.net.URISyntaxException e) {
-            throw new IOException (e);
-        }
-        _topic = topic;
-        _partition = Integer.valueOf(partition);
-        _offset = Long.valueOf(offset);
-    }
-    
-    public KafkaETLRequest(String topic, String uri, int partition) throws URISyntaxException {
-        _topic = topic;
-        _uri = new URI(uri);
-        _partition = partition;
-    }
-    
-    public void setDefaultOffset() {
-        _offset = DEFAULT_OFFSET;
-    }
-    
-    public void setOffset(long offset) {
-        _offset = offset;
-    }
-    
-    public String getTopic() { return _topic; }
-    public URI getURI () { return _uri; }
-    public int getPartition() { return _partition; }
-    public long getOffset() { return _offset; }
-    public String clientId() { return _clientId; }
-
-    public boolean isValidOffset() {
-        return _offset >= 0;
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-        if (! (o instanceof KafkaETLRequest))
-            return false;
-        
-        KafkaETLRequest r = (KafkaETLRequest) o;
-        return this._topic.equals(r._topic) ||
-                    this._uri.equals(r._uri) ||
-                    this._partition == r._partition;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString(0).hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return toString(_offset);
-    }
-    
-
-    public String toString (long offset) {
-    
-        return 
-        _uri + DELIM +
-        _topic + DELIM +
-        _partition + DELIM +
-       offset;
-    }
-    
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
deleted file mode 100644
index 02d79a1..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.BytesWritable;
-
-public class KafkaETLUtils {
-
-	public static PathFilter PATH_FILTER = new PathFilter() {
-		@Override
-		public boolean accept(Path path) {
-			return !path.getName().startsWith("_")
-					&& !path.getName().startsWith(".");
-		}
-	};
-
-	
-	public static Path getLastPath(Path path, FileSystem fs) throws IOException {
-
-		FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
-
-		if (statuses.length == 0) {
-			return path;
-		} else {
-			Arrays.sort(statuses);
-			return statuses[statuses.length - 1].getPath();
-		}
-	}
-
-	public static String getFileName(Path path) throws IOException {
-		String fullname = path.toUri().toString();
-		String[] parts = fullname.split(Path.SEPARATOR);
-		if (parts.length < 1)
-			throw new IOException("Invalid path " + fullname);
-		return parts[parts.length - 1];
-	}
-
-	public static List<String> readText(FileSystem fs, String inputFile)
-			throws IOException, FileNotFoundException {
-		Path path = new Path(inputFile);
-		return readText(fs, path);
-	}
-
-	public static List<String> readText(FileSystem fs, Path path)
-			throws IOException, FileNotFoundException {
-		if (!fs.exists(path)) {
-			throw new FileNotFoundException("File " + path + " doesn't exist!");
-		}
-		BufferedReader in = new BufferedReader(new InputStreamReader(
-				fs.open(path)));
-		List<String> buf = new ArrayList<String>();
-		String line = null;
-
-		while ((line = in.readLine()) != null) {
-			if (line.trim().length() > 0)
-				buf.add(new String(line.trim()));
-		}
-		in.close();
-		return buf;
-	}
-
-	public static void writeText(FileSystem fs, Path outPath, String content)
-			throws IOException {
-		long timestamp = System.currentTimeMillis();
-		String localFile = "/tmp/KafkaETL_tmp_" + timestamp;
-		PrintWriter writer = new PrintWriter(new FileWriter(localFile));
-		writer.println(content);
-		writer.close();
-
-		Path src = new Path(localFile);
-		fs.moveFromLocalFile(src, outPath);
-	}
-
-	public static Props getPropsFromJob(Configuration conf) {
-		String propsString = conf.get("kafka.etl.props");
-		if (propsString == null)
-			throw new UndefinedPropertyException(
-					"The required property kafka.etl.props was not found in the Configuration.");
-		try {
-			ByteArrayInputStream input = new ByteArrayInputStream(
-					propsString.getBytes("UTF-8"));
-			Properties properties = new Properties();
-			properties.load(input);
-			return new Props(properties);
-		} catch (IOException e) {
-			throw new RuntimeException("This is not possible!", e);
-		}
-	}
-
-	 public static void setPropsInJob(Configuration conf, Props props)
-	  {
-	    ByteArrayOutputStream output = new ByteArrayOutputStream();
-	    try
-	    {
-	      props.store(output);
-	      conf.set("kafka.etl.props", new String(output.toByteArray(), "UTF-8"));
-	    }
-	    catch (IOException e)
-	    {
-	      throw new RuntimeException("This is not possible!", e);
-	    }
-	  }
-	 
-	public static Props readProps(String file) throws IOException {
-		Path path = new Path(file);
-		FileSystem fs = path.getFileSystem(new Configuration());
-		if (fs.exists(path)) {
-			InputStream input = fs.open(path);
-			try {
-				// wrap it up in another layer so that the user can override
-				// properties
-				Props p = new Props(input);
-				return new Props(p);
-			} finally {
-				input.close();
-			}
-		} else {
-			return new Props();
-		}
-	}
-
-	public static String findContainingJar(
-			@SuppressWarnings("rawtypes") Class my_class, ClassLoader loader) {
-		String class_file = my_class.getName().replaceAll("\\.", "/")
-				+ ".class";
-		return findContainingJar(class_file, loader);
-	}
-
-	public static String findContainingJar(String fileName, ClassLoader loader) {
-		try {
-			for (@SuppressWarnings("rawtypes")
-			Enumeration itr = loader.getResources(fileName); itr
-					.hasMoreElements();) {
-				URL url = (URL) itr.nextElement();
-				// logger.info("findContainingJar finds url:" + url);
-				if ("jar".equals(url.getProtocol())) {
-					String toReturn = url.getPath();
-					if (toReturn.startsWith("file:")) {
-						toReturn = toReturn.substring("file:".length());
-					}
-					toReturn = URLDecoder.decode(toReturn, "UTF-8");
-					return toReturn.replaceAll("!.*$", "");
-				}
-			}
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		return null;
-	}
-
-    public static byte[] getBytes(BytesWritable val) {
-        
-        byte[] buffer = val.getBytes();
-        
-        /* FIXME: remove the following part once the below gira is fixed
-         * https://issues.apache.org/jira/browse/HADOOP-6298
-         */
-        long len = val.getLength();
-        byte [] bytes = buffer;
-        if (len < buffer.length) {
-            bytes = new byte[(int) len];
-            System.arraycopy(buffer, 0, bytes, 0, (int)len);
-        }
-        
-        return bytes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
deleted file mode 100644
index 71eb80f..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import kafka.common.KafkaException;
-import org.apache.log4j.Logger;
-
-public class Props extends Properties {
-
-	private static final long serialVersionUID = 1L;
-	private static Logger logger = Logger.getLogger(Props.class);
-	
-	/**
-	 * default constructor
-	 */
-	public Props() {
-		super();
-	}
-
-	/**
-	 * copy constructor 
-	 * @param props
-	 */
-	public Props(Props props) {
-		if (props != null) {
-			this.put(props);
-		}
-	}
-	
-	/**
-	 * construct props from a list of files
-	 * @param files		paths of files
-	 * @throws FileNotFoundException
-	 * @throws IOException
-	 */
-	public Props(String... files) throws FileNotFoundException, IOException {
-		this(Arrays.asList(files));
-	}
-
-	/**
-	 * construct props from a list of files
-	 * @param files		paths of files
-	 * @throws FileNotFoundException
-	 * @throws IOException
-	 */
-	public Props(List<String> files) throws FileNotFoundException, IOException {
-
-		for (int i = 0; i < files.size(); i++) {
-			InputStream input = new BufferedInputStream(new FileInputStream(
-					new File(files.get(i)).getAbsolutePath()));
-			super.load(input);
-			input.close();
-		}
-	}
-
-	/**
-	 * construct props from a list of input streams
-	 * @param inputStreams
-	 * @throws IOException
-	 */
-	public Props(InputStream... inputStreams) throws IOException {
-		for (InputStream stream : inputStreams)
-			super.load(stream);
-	}
-
-	/**
-	 * construct props from a list of maps
-	 * @param props
-	 */
-	public Props(Map<String, String>... props) {
-		for (int i = props.length - 1; i >= 0; i--)
-			super.putAll(props[i]);
-	}
-
-	/**
-	 * construct props from a list of Properties
-	 * @param properties
-	 */
-	public Props(Properties... properties) {
-		for (int i = properties.length - 1; i >= 0; i--){
-			this.put(properties[i]);
-		}
-	}
-
-	/**
-	 * build props from a list of strings and interpret them as
-	 * key, value, key, value,....
-	 * 
-	 * @param args
-	 * @return props
-	 */
-	@SuppressWarnings("unchecked")
-	public static Props of(String... args) {
-		if (args.length % 2 != 0)
-			throw new KafkaException(
-					"Must have an equal number of keys and values.");
-		Map<String, String> vals = new HashMap<String, String>(args.length / 2);
-		for (int i = 0; i < args.length; i += 2)
-			vals.put(args[i], args[i + 1]);
-		return new Props(vals);
-	}
-
-	/**
-	 * Put the given Properties into the Props. 
-	 * 
-	 * @param properties
-	 *            The properties to put
-	 * 
-	 */
-	public void put(Properties properties) {
-		for (String propName : properties.stringPropertyNames()) {
-			super.put(propName, properties.getProperty(propName));
-		}
-	}
-
-	/**
-	 * get property of "key" and split the value by " ," 
-	 * @param key		
-	 * @return list of values
-	 */
-	public List<String> getStringList(String key) {
-		return getStringList(key, "\\s*,\\s*");
-	}
-
-	/**
-	 * get property of "key" and split the value by "sep"
-	 * @param key
-	 * @param sep
-	 * @return string list of values
-	 */
-	public List<String> getStringList(String key, String sep) {
-		String val =  super.getProperty(key);
-		if (val == null || val.trim().length() == 0)
-			return Collections.emptyList();
-
-		if (containsKey(key))
-			return Arrays.asList(val.split(sep));
-		else
-			throw new UndefinedPropertyException("Missing required property '"
-					+ key + "'");
-	}
-
-	/**
-	 * get string list with default value. default delimiter is ","
-	 * @param key
-	 * @param defaultValue
-	 * @return string list of values
-	 */
-	public List<String> getStringList(String key, List<String> defaultValue) {
-		if (containsKey(key))
-			return getStringList(key);
-		else
-			return defaultValue;
-	}
-
-	/**
-	 * get string list with default value
-	 * @param key
-	 * @param defaultValue
-	 * @return string list of values
-	 */
-	public List<String> getStringList(String key, List<String> defaultValue,
-			String sep) {
-		if (containsKey(key))
-			return getStringList(key, sep);
-		else
-			return defaultValue;
-	}
-
-	@SuppressWarnings("unchecked")
-	protected <T> T getValue(String key, T defaultValue) 
-	throws Exception {
-		
-		if (containsKey(key)) {
-			Object value = super.get(key);
-			if (value.getClass().isInstance(defaultValue)) {
-				return (T)value;
-			} else if (value instanceof String) {
-				// call constructor(String) to initialize it
-				@SuppressWarnings("rawtypes")
-				Constructor ct = defaultValue.getClass().getConstructor(String.class);
-				String v = ((String)value).trim();
-				Object ret = ct.newInstance(v);
-				return (T) ret;
-			}
-			else throw new UndefinedPropertyException ("Property " + key + 
-					": cannot convert value of " + value.getClass().getName() + 
-					" to " + defaultValue.getClass().getName());
-		}
-		else {
-			return defaultValue;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	protected <T> T getValue(String key, Class<T> mclass) 
-	throws Exception {
-		
-		if (containsKey(key)) {
-			Object value = super.get(key);
-			if (value.getClass().equals(mclass)) {
-				return (T)value;
-			} else if (value instanceof String) {
-				// call constructor(String) to initialize it
-				@SuppressWarnings("rawtypes")
-				Constructor ct = mclass.getConstructor(String.class);
-				String v = ((String)value).trim();
-				Object ret = ct.newInstance(v);
-				return (T) ret;
-			}
-			else throw new UndefinedPropertyException ("Property " + key + 
-					": cannot convert value of " + value.getClass().getName() + 
-					" to " + mclass.getClass().getName());
-		}
-		else {
-			throw new UndefinedPropertyException ("Missing required property '"
-					+ key + "'");
-		}
-	}
-
-	/**
-	 * get boolean value with default value
-	 * @param key
-	 * @param defaultValue
-	 * @return boolean value
-	 * @throws Exception 	if value is not of type boolean or string
-	 */
-	public Boolean getBoolean(String key, Boolean defaultValue) 
-	throws Exception {
-		return getValue (key, defaultValue);
-	}
-
-	/**
-	 * get boolean value
-	 * @param key
-	 * @return boolean value
-	 * @throws Exception 	if value is not of type boolean or string or 
-	 * 										if value doesn't exist
-	 */
-	public Boolean getBoolean(String key) throws Exception {
-		return getValue (key, Boolean.class);
-	}
-
-	/**
-	 * get long value with default value
-	 * @param name
-	 * @param defaultValue
-	 * @return long value
-	 * @throws Exception 	if value is not of type long or string
-	 */
-	public Long getLong(String name, Long defaultValue) 
-	throws Exception {
-		return getValue(name, defaultValue);
-	}
-
-	/**
-	 * get long value
-	 * @param name
-	 * @return long value
-	 * @throws Exception 	if value is not of type long or string or 
-	 * 										if value doesn't exist
-	 */
-	public Long getLong(String name) throws Exception  {
-		return getValue (name, Long.class);
-	}
-
-	/**
-	 * get integer value with default value
-	 * @param name
-	 * @param defaultValue
-	 * @return integer value
-	 * @throws Exception 	if value is not of type integer or string
-	 */
-	public Integer getInt(String name, Integer defaultValue) 
-	throws Exception  {
-		return getValue(name, defaultValue);
-	}
-
-	/**
-	 * get integer value
-	 * @param name
-	 * @return integer value
-	 * @throws Exception 	if value is not of type integer or string or 
-	 * 										if value doesn't exist
-	 */
-	public Integer getInt(String name) throws Exception {
-		return getValue (name, Integer.class);
-	}
-
-	/**
-	 * get double value with default value
-	 * @param name
-	 * @param defaultValue
-	 * @return double value
-	 * @throws Exception 	if value is not of type double or string
-	 */
-	public Double getDouble(String name, double defaultValue) 
-	throws Exception {
-		return getValue(name, defaultValue);
-	}
-
-	/**
-	 * get double value
-	 * @param name
-	 * @return double value
-	 * @throws Exception 	if value is not of type double or string or 
-	 * 										if value doesn't exist
-	 */
-	public double getDouble(String name) throws Exception {
-		return getValue(name, Double.class);
-	}
-
-	/**
-	 * get URI value with default value
-	 * @param name
-	 * @param defaultValue
-	 * @return URI value
-	 * @throws Exception 	if value is not of type URI or string 
-	 */
-	public URI getUri(String name, URI defaultValue) throws Exception {
-		return getValue(name, defaultValue);
-	}
-
-	/**
-	 * get URI value
-	 * @param name
-	 * @param defaultValue
-	 * @return URI value
-	 * @throws Exception 	if value is not of type URI or string 
-	 */
-	public URI getUri(String name, String defaultValue) 
-	throws Exception {
-		URI defaultV = new URI(defaultValue);
-		return getValue(name, defaultV);
-	}
-
-	/**
-	 * get URI value
-	 * @param name
-	 * @return URI value
-	 * @throws Exception 	if value is not of type URI or string or 
-	 * 										if value doesn't exist
-	 */
-	public URI getUri(String name) throws Exception {
-		return getValue(name, URI.class);
-	}
-
-	/**
-	 * compare two props 
-	 * @param p
-	 * @return true or false
-	 */
-	public boolean equalsProps(Props p) {
-		if (p == null) {
-			return false;
-		}
-
-		final Set<String> myKeySet = getKeySet();
-		for (String s : myKeySet) {
-			if (!get(s).equals(p.get(s))) {
-				return false;
-			}
-		}
-
-		return myKeySet.size() == p.getKeySet().size();
-	}
-
-
-	/**
-	 * Get a map of all properties by string prefix
-	 * 
-	 * @param prefix
-	 *            The string prefix
-	 */
-	public Map<String, String> getMapByPrefix(String prefix) {
-		Map<String, String> values = new HashMap<String, String>();
-
-		for (String key : super.stringPropertyNames()) {
-			if (key.startsWith(prefix)) {
-				values.put(key.substring(prefix.length()), super.getProperty(key));
-			}
-		}
-		return values;
-	}
-
-    /**
-     * Store all properties
-     * 
-     * @param out The stream to write to
-     * @throws IOException If there is an error writing
-     */
-    public void store(OutputStream out) throws IOException {
-           super.store(out, null);
-    }
-    
-    /**
-     * get all property names
-     * @return set of property names
-     */
-	public Set<String> getKeySet() {
-		return super.stringPropertyNames();
-	}
-
-	/**
-	 * log properties
-	 * @param comment
-	 */
-	public void logProperties(String comment) {
-		logger.info(comment);
-
-		for (String key : getKeySet()) {
-			logger.info("  key=" + key + " value=" + get(key));
-		}
-	}
-
-	/**
-	 * clone a Props
-	 * @param p
-	 * @return props
-	 */
-	public static Props clone(Props p) {
-		return new Props(p);
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
deleted file mode 100644
index 9278122..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl;
-
-public class UndefinedPropertyException extends RuntimeException {
-
-	private static final long serialVersionUID = 1;
-
-	public UndefinedPropertyException(String message) {
-		super(message);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
deleted file mode 100644
index d27a511..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl.impl;
-
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import kafka.etl.KafkaETLKey;
-import kafka.etl.KafkaETLRequest;
-import kafka.etl.Props;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.producer.KeyedMessage;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
-import static org.apache.kafka.common.utils.Utils.formatAddress;
-
-/**
- * Use this class to produce test events to Kafka server. Each event contains a
- * random timestamp in text format.
- */
-@SuppressWarnings("deprecation")
-public class DataGenerator {
-
-	protected final static Random RANDOM = new Random(
-			System.currentTimeMillis());
-
-	protected Props _props;
-	protected Producer _producer = null;
-	protected URI _uri = null;
-	protected String _topic;
-	protected int _count;
-	protected String _offsetsDir;
-	protected final int TCP_BUFFER_SIZE = 300 * 1000;
-	protected final int CONNECT_TIMEOUT = 20000; // ms
-	protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms
-
-	public DataGenerator(String id, Props props) throws Exception {
-		_props = props;
-		_topic = props.getProperty("kafka.etl.topic");
-		System.out.println("topics=" + _topic);
-		_count = props.getInt("event.count");
-
-		_offsetsDir = _props.getProperty("input");
-		
-		// initialize kafka producer to generate count events
-		String serverUri = _props.getProperty("kafka.server.uri");
-		_uri = new URI (serverUri);
-		
-		System.out.println("server uri:" + _uri.toString());
-        Properties producerProps = new Properties();
-        producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort()));
-        producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
-        producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
-        producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
-        
-		_producer = new Producer(new ProducerConfig(producerProps));
-			
-	}
-
-	public void run() throws Exception {
-
-		List<KeyedMessage> list = new ArrayList<KeyedMessage>();
-		for (int i = 0; i < _count; i++) {
-			Long timestamp = RANDOM.nextLong();
-			if (timestamp < 0) timestamp = -timestamp;
-			byte[] bytes = timestamp.toString().getBytes("UTF8");
-            list.add(new KeyedMessage<Integer, byte[]>(_topic, null, bytes));
-		}
-		// send events
-		System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
-		_producer.send(list);
-
-		// close the producer
-		_producer.close();
-		
-		// generate offset files
-		generateOffsets();
-	}
-
-    protected void generateOffsets() throws Exception {
-        JobConf conf = new JobConf();
-        conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi"));
-        conf.setCompressMapOutput(false);
-        Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat");
-        FileSystem fs = outPath.getFileSystem(conf);
-        if (fs.exists(outPath)) fs.delete(outPath);
-        
-        KafkaETLRequest request =
-            new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()), 0);
-
-        System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
-        byte[] bytes = request.toString().getBytes("UTF-8");
-        KafkaETLKey dummyKey = new KafkaETLKey();
-        SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE);
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath, 
-                                        KafkaETLKey.class, BytesWritable.class);
-        writer.append(dummyKey, new BytesWritable(bytes));
-        writer.close();
-    }
-    
-	public static void main(String[] args) throws Exception {
-
-		if (args.length < 1)
-			throw new Exception("Usage: - config_file");
-
-		Props props = new Props(args[0]);
-		DataGenerator job = new DataGenerator("DataGenerator", props);
-		job.run();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
deleted file mode 100644
index d269704..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLJob.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.etl.impl;
-
-import kafka.etl.KafkaETLInputFormat;
-import kafka.etl.KafkaETLJob;
-import kafka.etl.Props;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-/**
- * This is a simple Kafka ETL job which pull text events generated by
- * DataGenerator and store them in hdfs
- */
-@SuppressWarnings("deprecation")
-public class SimpleKafkaETLJob {
-
-    protected String _name;
-    protected Props _props;
-    protected String _input;
-    protected String _output;
-    protected String _topic;
-    
-	public SimpleKafkaETLJob(String name, Props props) throws Exception {
-		_name = name;
-		_props = props;
-		
-		_input = _props.getProperty("input");
-		_output = _props.getProperty("output");
-		
-		_topic = props.getProperty("kafka.etl.topic");
-	}
-
-
-	protected JobConf createJobConf() throws Exception {
-		JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", _topic, _props, getClass());
-		
-		jobConf.setMapperClass(SimpleKafkaETLMapper.class);
-		KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input));
-		
-		jobConf.setOutputKeyClass(LongWritable.class);
-		jobConf.setOutputValueClass(Text.class);
-		jobConf.setOutputFormat(TextOutputFormat.class);
-		TextOutputFormat.setCompressOutput(jobConf, false);
-		Path output = new Path(_output);
-		FileSystem fs = output.getFileSystem(jobConf);
-		if (fs.exists(output)) fs.delete(output);
-		TextOutputFormat.setOutputPath(jobConf, output);
-		
-		jobConf.setNumReduceTasks(0);
-		return jobConf;
-	}
-	
-    public void execute () throws Exception {
-        JobConf conf = createJobConf();
-        RunningJob runningJob = new JobClient(conf).submitJob(conf);
-        String id = runningJob.getJobID();
-        System.out.println("Hadoop job id=" + id);
-        runningJob.waitForCompletion();
-        
-        if (!runningJob.isSuccessful()) 
-            throw new Exception("Hadoop ETL job failed! Please check status on http://"
-                                         + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id);
-    }
-
-	/**
-	 * for testing only
-	 * 
-	 * @param args
-	 * @throws Exception
-	 */
-	public static void main(String[] args) throws Exception {
-
-		if (args.length < 1)
-			throw new Exception("Usage: - config_file");
-
-		Props props = new Props(args[0]);
-		SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob",
-				props);
-		job.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
deleted file mode 100644
index 0fea5db..0000000
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.etl.impl;
-
-import kafka.etl.KafkaETLKey;
-import kafka.etl.KafkaETLUtils;
-import kafka.message.Message;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Simple implementation of KafkaETLMapper. It assumes that 
- * input data are text timestamp (long).
- */
-@SuppressWarnings("deprecation")
-public class SimpleKafkaETLMapper implements
-Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
-
-    protected long _count = 0;
-    
-	protected Text getData(Message message) throws IOException {
-		ByteBuffer buf = message.payload();
-		if(buf == null)
-		  return new Text();
-		
-		byte[] array = new byte[buf.limit()];
-		buf.get(array);
-		
-		Text text = new Text( new String(array, "UTF8"));
-		return text;
-	}
-
-
-    @Override
-    public void map(KafkaETLKey key, BytesWritable val,
-            OutputCollector<LongWritable, Text> collector,
-            Reporter reporter) throws IOException {
-        
-         
-        byte[] bytes = KafkaETLUtils.getBytes(val);
-        
-        //check the checksum of message
-        Message message = new Message(ByteBuffer.wrap(bytes));
-        long checksum = key.getChecksum();
-        if (checksum != message.checksum()) 
-            throw new IOException ("Invalid message checksum " 
-                                            + message.checksum() + ". Expected " + key + ".");
-        Text data = getData (message);
-        _count ++;
-           
-        collector.collect(new LongWritable (_count), data);
-
-    }
-
-
-    @Override
-    public void configure(JobConf arg0) {
-        // TODO Auto-generated method stub
-        
-    }
-
-
-    @Override
-    public void close() throws IOException {
-        // TODO Auto-generated method stub
-        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69af573b/contrib/hadoop-consumer/test/test.properties
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/test/test.properties b/contrib/hadoop-consumer/test/test.properties
deleted file mode 100644
index cdea8cc..0000000
--- a/contrib/hadoop-consumer/test/test.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# name of test topic
-kafka.etl.topic=SimpleTestEvent
-
-# hdfs location of jars
-hdfs.default.classpath.dir=/tmp/kafka/lib
-
-# number of test events to be generated
-event.count=1000
-
-# hadoop id and group
-hadoop.job.ugi=kafka,hadoop
-
-# kafka server uri
-kafka.server.uri=tcp://localhost:9092
-
-# hdfs location of input directory 
-input=/tmp/kafka/data
-
-# hdfs location of output directory
-output=/tmp/kafka/output
-
-# limit the number of events to be fetched;
-# value -1 means no limitation
-kafka.request.limit=-1
-
-# kafka parameters
-client.buffer.size=1048576
-client.so.timeout=60000