You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/24 09:38:05 UTC

[1/2] FLUME-2094. Remove the deprecated Recoverable Memory Channel.

Updated Branches:
  refs/heads/trunk c059c916e -> 08843202f


http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a7aeef4..816a5ed 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1972,31 +1972,6 @@ Example for agent named a1:
   a1.channels = c1
   a1.channels.c1.type = jdbc
 
-Recoverable Memory Channel
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-
-.. warning:: The Recoverable Memory Channel has been deprecated
-             in favor of the FileChannel. FileChannel is durable channel
-             and performs better than the Recoverable Memory Channel.
-
-Required properties are in **bold**.
-
-======================  ===============================================  =========================================================================
-Property Name           Default                                          Description
-======================  ===============================================  =========================================================================
-**type**                --                                               The component type name, needs to be
-                                                                         ``org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel``
-wal.dataDir             ${user.home}/.flume/recoverable-memory-channel
-wal.rollSize            (0x04000000)                                     Max size (in bytes) of a single file before we roll
-wal.minRetentionPeriod  300000                                           Min amount of time (in millis) to keep a log
-wal.workerInterval      60000                                            How often (in millis) the background worker checks for old logs
-wal.maxLogsSize         (0x20000000)                                     Total amt (in bytes) of logs to keep, excluding the current log
-capacity                100
-transactionCapacity     100
-keep-alive              3
-======================  ===============================================  =========================================================================
-
 
 File Channel
 ~~~~~~~~~~~~
@@ -3070,7 +3045,6 @@ Component Interface                                           Type Alias
 ============================================================  ======================  ====================================================================
 org.apache.flume.Channel                                      memory                  org.apache.flume.channel.MemoryChannel
 org.apache.flume.Channel                                      jdbc                    org.apache.flume.channel.jdbc.JdbcChannel
-org.apache.flume.Channel                                      recoverablememory       org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
 org.apache.flume.Channel                                      file                    org.apache.flume.channel.file.FileChannel
 org.apache.flume.Channel                                      --                      org.apache.flume.channel.PseudoTxnMemoryChannel
 org.apache.flume.Channel                                      --                      org.example.MyChannel

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6846b07..7f6f824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -925,12 +925,6 @@ limitations under the License.
         <version>1.4.0-SNAPSHOT</version>
       </dependency>
 
-     <dependency>
-       <groupId>org.apache.flume.flume-ng-channels</groupId>
-       <artifactId>flume-recoverable-memory-channel</artifactId>
-       <version>1.4.0-SNAPSHOT</version>
-     </dependency>
-
       <dependency>
         <groupId>org.apache.flume.flume-ng-sinks</groupId>
         <artifactId>flume-hdfs-sink</artifactId>


[2/2] git commit: FLUME-2094. Remove the deprecated Recoverable Memory Channel.

Posted by mp...@apache.org.
FLUME-2094. Remove the deprecated Recoverable Memory Channel.

(Roshan Naik via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/08843202
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/08843202
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/08843202

Branch: refs/heads/trunk
Commit: 08843202f78e0539e7ebb48c1d1a3fe7986db899
Parents: c059c91
Author: Mike Percy <mp...@apache.org>
Authored: Mon Jun 24 00:37:06 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jun 24 00:37:06 2013 -0700

----------------------------------------------------------------------
 .../flume-recoverable-memory-channel/pom.xml    | 108 ----
 .../memory/RecoverableMemoryChannel.java        | 321 -----------
 .../memory/RecoverableMemoryChannelEvent.java   | 113 ----
 .../memory/wal/SequenceIDBuffer.java            | 126 -----
 .../channel/recoverable/memory/wal/WAL.java     | 550 -------------------
 .../recoverable/memory/wal/WALDataFile.java     | 154 ------
 .../recoverable/memory/wal/WALEntry.java        |  84 ---
 .../recoverable/memory/wal/WALReplayResult.java |  41 --
 .../memory/TestRecoverableMemoryChannel.java    | 336 -----------
 .../memory/wal/TestSequenceIDBuffer.java        |  73 ---
 .../channel/recoverable/memory/wal/TestWAL.java | 287 ----------
 .../src/test/resources/log4j.properties         |  25 -
 flume-ng-channels/pom.xml                       |   1 -
 .../conf/channel/ChannelConfiguration.java      |  10 +-
 .../apache/flume/conf/channel/ChannelType.java  |   9 +-
 flume-ng-dist/pom.xml                           |   4 -
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  26 -
 pom.xml                                         |   6 -
 18 files changed, 2 insertions(+), 2272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/pom.xml b/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
deleted file mode 100644
index 80f8d21..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
+++ /dev/null
@@ -1,108 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>flume-ng-channels</artifactId>
-    <groupId>org.apache.flume</groupId>
-    <version>1.4.0-SNAPSHOT</version>
-  </parent>
-
-  <groupId>org.apache.flume.flume-ng-channels</groupId>
-  <artifactId>flume-recoverable-memory-channel</artifactId>
-  <name>Flume NG file backed Memory channel</name>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-configuration</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>${hadoop.common.artifact.id}</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-  </dependencies>
-
-  <profiles>
-
-    <profile>
-      <id>hadoop-2</id>
-      <activation>
-        <property>
-          <name>hadoop.profile</name>
-          <value>2</value>
-        </property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-auth</artifactId>
-          <optional>true</optional>
-        </dependency>
-      </dependencies>
-    </profile>
-
-  </profiles>
-
-
-
-</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
deleted file mode 100644
index c7a9947..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.annotations.Disposable;
-import org.apache.flume.channel.BasicChannelSemantics;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.recoverable.memory.wal.WAL;
-import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
-import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * <p>
- * A durable {@link Channel} implementation that uses the local file system for
- * its storage.
- * </p>
- *
- * @deprecated The RecoverableMemoryChannel has been deprecated in favor of
- * {@link org.apache.flume.channel.file.FileChannel}, which gives better
- * performance and is also durable.
- */
-@Disposable
-@Deprecated
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class RecoverableMemoryChannel extends BasicChannelSemantics {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(RecoverableMemoryChannel.class);
-
-
-  public static final String WAL_DATA_DIR = "wal.dataDir";
-  public static final String WAL_ROLL_SIZE = "wal.rollSize";
-  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
-  public static final String WAL_MIN_RETENTION_PERIOD = "wal.minRetentionPeriod";
-  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
-  public static final String CAPACITY = "capacity";
-  public static final String KEEPALIVE = "keep-alive";
-
-  public static final int DEFAULT_CAPACITY = 100;
-  public static final int DEFAULT_KEEPALIVE = 3;
-
-  private MemoryChannel memoryChannel = new MemoryChannel();
-  private AtomicLong seqidGenerator = new AtomicLong(0);
-  private WAL<RecoverableMemoryChannelEvent> wal;
-  /**
-   * MemoryChannel checks to ensure the capacity is available
-   * on commit. That is a problem because we need to write to
-   * disk before we commit the data to MemoryChannel. As such
-   * we keep track of capacity ourselves.
-   */
-  private Semaphore queueRemaining;
-  private int capacity;
-  private int keepAlive;
-  private volatile boolean open;
-
-  public RecoverableMemoryChannel() {
-    open = false;
-  }
-
-  @Override
-  public void configure(Context context) {
-    memoryChannel.configure(context);
-    int capacity = context.getInteger(CAPACITY, DEFAULT_CAPACITY);
-    if(queueRemaining == null) {
-      queueRemaining = new Semaphore(capacity, true);
-    } else if(capacity > this.capacity) {
-      // capacity increase
-      queueRemaining.release(capacity - this.capacity);
-    } else if(capacity < this.capacity) {
-      queueRemaining.acquireUninterruptibly(this.capacity - capacity);
-    }
-    this.capacity = capacity;
-    keepAlive = context.getInteger(KEEPALIVE, DEFAULT_KEEPALIVE);
-    long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
-    long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
-    long minLogRetentionPeriod = context.getLong(WAL_MIN_RETENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RETENTION_PERIOD);
-    long workerInterval = context.getLong(WAL_WORKER_INTERVAL, WAL.DEFAULT_WORKER_INTERVAL);
-    if(wal == null) {
-      String homePath = System.getProperty("user.home").replace('\\', '/');
-      String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
-      try {
-        wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
-            RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
-            minLogRetentionPeriod, workerInterval);
-      } catch (IOException e) {
-        Throwables.propagate(e);
-      }
-    } else {
-      wal.setRollSize(rollSize);
-      wal.setMaxLogsSize(maxLogsSize);
-      wal.setMinLogRetentionPeriod(minLogRetentionPeriod);
-      wal.setWorkerInterval(workerInterval);
-      LOG.warn(this.getClass().getSimpleName() + " only supports " +
-          "partial reconfiguration.");
-    }
-  }
-
-  @Override
-  public synchronized void start() {
-    LOG.info("Starting " + this);
-    try {
-      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
-      Preconditions.checkArgument(results.getSequenceID() >= 0);
-      LOG.info("Replay SequenceID " + results.getSequenceID());
-      seqidGenerator.set(results.getSequenceID());
-      int numResults = results.getResults().size();
-      Preconditions.checkState(numResults <= capacity, "Capacity " + capacity +
-          ", but we need to replay " + numResults);
-      LOG.info("Replay Events " + numResults);
-      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
-        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
-      }
-      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
-        Transaction transaction = null;
-        try {
-          transaction = memoryChannel.getTransaction();
-          transaction.begin();
-          memoryChannel.put(entry.getData());
-          transaction.commit();
-        } catch(Exception e) {
-          if(transaction != null) {
-            try {
-              transaction.rollback();
-            } catch(Exception ex) {
-              LOG.info("Error during rollback", ex);
-            }
-          }
-          Throwables.propagate(e);
-        } catch(Error e) {
-          if(transaction != null) {
-            try {
-              transaction.rollback();
-            } catch(Exception ex) {
-              LOG.info("Error during rollback", ex);
-            }
-          }
-          throw e;
-        } finally {
-          if(transaction != null) {
-            transaction.close();
-          }
-        }
-      }
-    } catch (IOException e) {
-      Throwables.propagate(e);
-    }
-    super.start();
-    open = true;
-  }
-
-  @Override
-  public synchronized void stop() {
-    open = false;
-    LOG.info("Stopping " + this);
-    try {
-      close();
-    } catch (IOException e) {
-      Throwables.propagate(e);
-    }
-    super.stop();
-  }
-
-  @Override
-  protected BasicTransactionSemantics createTransaction() {
-    return new RecoverableMemoryTransaction(this, memoryChannel);
-  }
-
-  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
-      throws IOException {
-    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
-    for(RecoverableMemoryChannelEvent event : events) {
-      entries.add(new WALEntry<RecoverableMemoryChannelEvent>(event, event.sequenceId));
-    }
-    wal.writeEntries(entries);
-  }
-  private void commitSequenceID(List<Long> seqids)
-      throws IOException {
-    wal.writeSequenceIDs(seqids);
-  }
-
-  private long nextSequenceID() {
-    return seqidGenerator.incrementAndGet();
-  }
-
-  void close() throws IOException {
-    if(wal != null) {
-      wal.close();
-    }
-  }
-
-  /**
-   * <p>
-   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
-   * </p>
-   */
-  private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
-
-    private Transaction transaction;
-    private MemoryChannel memoryChannel;
-    private RecoverableMemoryChannel channel;
-    private List<Long> sequenceIds = Lists.newArrayList();
-    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
-    private int takes;
-
-    private RecoverableMemoryTransaction(RecoverableMemoryChannel channel,
-        MemoryChannel memoryChannel) {
-      this.channel = channel;
-      this.memoryChannel = memoryChannel;
-      this.transaction = this.memoryChannel.getTransaction();
-      this.takes = 0;
-    }
-    @Override
-    protected void doBegin() throws InterruptedException {
-      transaction.begin();
-    }
-    @Override
-    protected void doPut(Event event) throws InterruptedException {
-      if(!channel.open) {
-        throw new ChannelException("Channel not open");
-      }
-      if(!channel.queueRemaining.tryAcquire(channel.keepAlive, TimeUnit.SECONDS)) {
-        throw new ChannelException("Cannot acquire capacity");
-      }
-      RecoverableMemoryChannelEvent sequencedEvent =
-          new RecoverableMemoryChannelEvent(event, channel.nextSequenceID());
-      memoryChannel.put(sequencedEvent);
-      events.add(sequencedEvent);
-    }
-
-    @Override
-    protected Event doTake() throws InterruptedException {
-      if(!channel.open) {
-        throw new ChannelException("Channel not open");
-      }
-      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
-      if(event != null) {
-        sequenceIds.add(event.sequenceId);
-        takes++;
-        return event.event;
-      }
-      return null;
-    }
-
-    @Override
-    protected void doCommit() throws InterruptedException {
-      if(!channel.open) {
-        throw new ChannelException("Channel not open");
-      }
-      if(sequenceIds.size() > 0) {
-        try {
-          channel.commitSequenceID(sequenceIds);
-        } catch (IOException e) {
-          throw new ChannelException("Unable to commit", e);
-        }
-      }
-      if(!events.isEmpty()) {
-        try {
-          channel.commitEvents(events);
-        } catch (IOException e) {
-          throw new ChannelException("Unable to commit", e);
-        }
-      }
-      transaction.commit();
-      channel.queueRemaining.release(takes);
-    }
-
-    @Override
-    protected void doRollback() throws InterruptedException {
-      sequenceIds.clear();
-      events.clear();
-      channel.queueRemaining.release(events.size());
-      transaction.rollback();
-    }
-
-    @Override
-    protected void doClose() {
-      sequenceIds.clear();
-      events.clear();
-      transaction.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
deleted file mode 100644
index 1fd0d33..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-class RecoverableMemoryChannelEvent implements Event, Writable {
-  Event event;
-  long sequenceId;
-
-  // called via reflection
-  @SuppressWarnings("unused")
-  private RecoverableMemoryChannelEvent() {
-    this.event = new SimpleEvent();
-  }
-
-  RecoverableMemoryChannelEvent(Event event, long sequenceId) {
-    this.event = event;
-    this.sequenceId = sequenceId;
-  }
-  @Override
-  public Map<String, String> getHeaders() {
-    return event.getHeaders();
-  }
-  @Override
-  public void setHeaders(Map<String, String> headers) {
-    event.setHeaders(headers);
-  }
-  @Override
-  public byte[] getBody() {
-    return event.getBody();
-  }
-  @Override
-  public void setBody(byte[] body) {
-    event.setBody(body);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(sequenceId);
-    MapWritable map = toMapWritable(getHeaders());
-    map.write(out);
-    byte[] body = getBody();
-    if(body == null) {
-      out.writeInt(-1);
-    } else {
-      out.writeInt(body.length);
-      out.write(body);
-    }
-  }
-
-
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    sequenceId = in.readLong();
-    MapWritable map = new MapWritable();
-    map.readFields(in);
-    setHeaders(fromMapWritable(map));
-    byte[] body = null;
-    int bodyLength = in.readInt();
-    if(bodyLength != -1) {
-      body = new byte[bodyLength];
-      in.readFully(body);
-    }
-    setBody(body);
-  }
-  private MapWritable toMapWritable(Map<String, String> map) {
-    MapWritable result = new MapWritable();
-    if(map != null) {
-      for(Map.Entry<String, String> entry : map.entrySet()) {
-        result.put(new Text(entry.getKey()),new Text(entry.getValue()));
-      }
-    }
-    return result;
-  }
-  private Map<String, String> fromMapWritable(MapWritable map) {
-    Map<String, String> result = Maps.newHashMap();
-    if(map != null) {
-      for(Map.Entry<Writable, Writable> entry : map.entrySet()) {
-        result.put(entry.getKey().toString(),entry.getValue().toString());
-      }
-    }
-    return result;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
deleted file mode 100644
index 02b0f81..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.nio.ByteBuffer;
-import java.nio.LongBuffer;
-
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.QuickSort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Throwables;
-
-public class SequenceIDBuffer implements IndexedSortable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SequenceIDBuffer.class);
-  private static final int SIZE_OF_LONG = 8;
-  private LongBuffer buffer;
-  private ByteBuffer backingBuffer;
-
-  public SequenceIDBuffer(int size) {
-    int bytesRequired = size * SIZE_OF_LONG;
-    backingBuffer = DirectMemoryUtils.allocate(bytesRequired);
-    buffer = backingBuffer.asLongBuffer();
-  }
-
-  @Override
-  public int compare(int leftIndex, int rightIndex) {
-    long left = get(leftIndex);
-    long right = get(rightIndex);
-    return (left < right ? -1 : (left == right ? 0 : 1));
-
-  }
-
-  public boolean exists(long value) {
-    return binarySearch(value) >= 0;
-  }
-
-  private int binarySearch(long value) {
-    int low = 0;
-    int high = size() - 1;
-
-    while (low <= high) {
-      int mid = (low + high) >>> 1;
-      long midVal = get(mid);
-
-      if (midVal < value) {
-        low = mid + 1;
-      } else if (midVal > value) {
-        high = mid - 1;
-      } else {
-        return mid; // key found
-      }
-    }
-    return -(low + 1); // key not found.
-  }
-
-  @Override
-  public void swap(int leftIndex, int rightIndex) {
-    long left = get(leftIndex);
-    long right = get(rightIndex);
-    put(leftIndex, right);
-    put(rightIndex, left);
-  }
-
-  public long get(int index) {
-    return buffer.get(index);
-  }
-
-  public void put(int index, long value) {
-    buffer.put(index, value);
-  }
-
-  public int size() {
-    return buffer.limit();
-  }
-
-  public void close() {
-    try {
-      DirectMemoryUtils.clean(backingBuffer);
-    } catch (Exception e) {
-      LOG.warn("Error cleaning up buffer", e);
-      if (LOG.isDebugEnabled()) {
-        Throwables.propagate(e);
-      }
-    }
-  }
-
-  public void sort() {
-    QuickSort quickSort = new QuickSort();
-    quickSort.sort(this, 0, size());
-  }
-
-  public static void main(String[] args) throws Exception {
-    try {
-      System.out.println("SequenceIDBuffer");
-      SequenceIDBuffer buffer = new SequenceIDBuffer(13107200);
-      buffer.close();
-      System.out.println("Array");
-      @SuppressWarnings("unused")
-      long[] array = new long[13107200];
-    } catch (Throwable t) {
-      t.printStackTrace();
-    } finally {
-      Thread.sleep(Long.MAX_VALUE);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
deleted file mode 100644
index 223fba5..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-/**
- * Provides Write Ahead Log functionality for a generic Writable. All entries
- * stored in the WAL must be assigned a unique increasing sequence id. WAL
- * files will be removed when the following condition holds (defaults):
- *
- * At least 512MB of WAL's exist, the file in question is greater than
- * five minutes old and the largest committed sequence id is greater
- * than the largest sequence id in the file.
- *
- * <pre>
- *  WAL wal = new WAL(path, Writable.class);
- *  wal.writeEvent(event, 1);
- *  wal.writeEvent(event, 2);
- *  wal.writeSequenceID(1);
- *  wal.writeEvent(event, 3);
- *
- *  System crashes or shuts down...
- *
- *  WAL wal = new WAL(path, Writable.class);
- *  [Event 2, Event 3]  = wal.replay();
- * </pre>
- *
- * WAL files will be created in the specified data directory. They will be
- * rolled at 64MB and deleted five minutes after they are no longer needed.
- * that is the current sequence id) is greater than the greatest sequence id
- *  in the file.
- *
- * The only synchronization this class does is around rolling log files. When
- * a roll of the log file is required, the thread which discovers this
- * will execute the roll. Any threads calling a write*() method during
- * the roll will block until the roll is complete.
- */
-public class WAL<T extends Writable> implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(WAL.class);
-
-  private File path;
-  private File dataPath;
-  private File sequenceIDPath;
-  private Class<T> clazz;
-  private WALDataFile.Writer<T> dataFileWALWriter;
-  private WALDataFile.Writer<NullWritable> sequenceIDWALWriter;
-  private Map<String, Long> fileLargestSequenceIDMap = Collections
-      .synchronizedMap(new HashMap<String, Long>());
-  private AtomicLong largestCommitedSequenceID = new AtomicLong(0);
-  private volatile boolean rollRequired;
-  private volatile boolean rollInProgress;
-  private volatile long rollSize;
-  private volatile long maxLogsSize;
-  private volatile long minLogRetentionPeriod;
-  private volatile long workerInterval;
-  private int numReplaySequenceIDOverride;
-  private Worker backgroundWorker;
-
-  /**
-   * Number of bytes before we roll the file.
-   */
-  public static final long DEFAULT_ROLL_SIZE = 1024L * 1024L * 64L;
-  /**
-   * Number of bytes, to keep before we start pruning logs.
-   */
-  public static final long DEFAULT_MAX_LOGS_SIZE = 1024L * 1024L * 512L;
-  /**
-   * Minimum number of ms to keep a log file.
-   */
-  public static final long DEFAULT_MIN_LOG_RETENTION_PERIOD = 5L * 60L * 1000L;
-  /**
-   * How often in ms the background worker runs
-   */
-  public static final long DEFAULT_WORKER_INTERVAL = 60L * 1000L;
-
-  // used for testing only
-  WAL(File path, Class<T> clazz) throws IOException {
-    this(path, clazz, DEFAULT_ROLL_SIZE, DEFAULT_MAX_LOGS_SIZE,
-        DEFAULT_MIN_LOG_RETENTION_PERIOD, DEFAULT_WORKER_INTERVAL);
-  }
-
-  /**
-   * Creates a wal object with no defaults, using the specified parameters in
-   * the constructor for operation.
-   *
-   * @param path
-   * @param clazz
-   * @param rollSize
-   *          bytes - max size of a single file before we roll
-   * @param maxLogsSize
-   *          bytes - total amount of logs to keep excluding the current log
-   * @param minLogRentionPeriod
-   *          ms - minimum amount of time to keep a log
-   * @param workerInterval
-   *          ms - how often the background worker checks for old logs
-   * @throws IOException
-   */
-  public WAL(File path, Class<T> clazz, long rollSize,
-      long maxLogsSize, long minLogRentionPeriod,
-      long workerInterval) throws IOException {
-    this.path = path;
-    this.rollSize = rollSize;
-    this.maxLogsSize = maxLogsSize;
-    this.minLogRetentionPeriod = minLogRentionPeriod;
-    this.workerInterval = workerInterval;
-
-    StringBuffer buffer = new StringBuffer();
-    buffer.append("path = ").append(path).append(", ");
-    buffer.append("rollSize = ").append(rollSize).append(", ");
-    buffer.append("maxLogsSize = ").append(maxLogsSize).append(", ");
-    buffer.append("minLogRentionPeriod = ").append(minLogRentionPeriod).append(", ");
-    buffer.append("workerInterval = ").append(workerInterval);
-    LOG.info("WAL Parameters: " + buffer);
-
-    File clazzNamePath = new File(path, "clazz");
-    createOrDie(path);
-    if (clazzNamePath.exists()) {
-      String clazzName = Files.readFirstLine(clazzNamePath, Charsets.UTF_8);
-      if (!clazzName.equals(clazz.getName())) {
-        throw new IOException("WAL is for " + clazzName
-            + " and you are passing " + clazz.getName());
-      }
-    } else {
-      Files.write(clazz.getName().getBytes(Charsets.UTF_8), clazzNamePath);
-    }
-
-    dataPath = new File(path, "data");
-    sequenceIDPath = new File(path, "seq");
-    createOrDie(dataPath);
-    createOrDie(sequenceIDPath);
-    this.clazz = clazz;
-
-    backgroundWorker = new Worker(this);
-    backgroundWorker.setName("WAL-Worker-" + path.getAbsolutePath());
-    backgroundWorker.setDaemon(true);
-    backgroundWorker.start();
-
-    roll();
-  }
-
-  private void roll() throws IOException {
-    try {
-      rollInProgress = true;
-      LOG.info("Rolling WAL " + this.path);
-      if (dataFileWALWriter != null) {
-        fileLargestSequenceIDMap.put(dataFileWALWriter.getPath()
-            .getAbsolutePath(), dataFileWALWriter.getLargestSequenceID());
-        dataFileWALWriter.close();
-      }
-      if (sequenceIDWALWriter != null) {
-        fileLargestSequenceIDMap.put(sequenceIDWALWriter.getPath()
-            .getAbsolutePath(), sequenceIDWALWriter.getLargestSequenceID());
-        sequenceIDWALWriter.close();
-      }
-      long ts = System.currentTimeMillis();
-      File dataWalFileName = new File(dataPath, Long.toString(ts));
-      File seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
-      while (dataWalFileName.exists() || seqWalFileName.exists()) {
-        ts++;
-        dataWalFileName = new File(dataPath, Long.toString(ts));
-        seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
-      }
-
-      dataFileWALWriter = new WALDataFile.Writer<T>(dataWalFileName);
-      sequenceIDWALWriter = new WALDataFile.Writer<NullWritable>(seqWalFileName);
-      rollRequired = false;
-    } finally {
-      rollInProgress = false;
-      // already have lock but is more clear
-      synchronized (this) {
-        notifyAll();
-      }
-    }
-  }
-
-  public WALReplayResult<T> replay() throws IOException {
-    final AtomicLong sequenceID = new AtomicLong(0);
-    final Map<String, Long> fileLargestSequenceIDMap = Maps.newHashMap();
-    final AtomicLong totalBytes = new AtomicLong(0);
-    // first get the total amount of data we have to read in
-    readFiles(sequenceIDPath, new Function<File, Void>() {
-      @Override
-      public Void apply(File input) {
-        totalBytes.addAndGet(input.length());
-        return null;
-      }
-    });
-    // then estimate the size of the array
-    // needed to hold all the sequence ids
-    int baseSize = WALEntry.getBaseSize();
-    int numEntries = Math.max((int)((totalBytes.get() / baseSize) * 1.05f) + 1,
-        numReplaySequenceIDOverride);
-    LOG.info("Replay assumptions: baseSize = " + baseSize
-        + ", estimatedNumEntries " + numEntries);
-    final SequenceIDBuffer sequenceIDs = new SequenceIDBuffer(numEntries);
-
-    // read them all into ram
-    final AtomicInteger index = new AtomicInteger(0);
-    readFiles(sequenceIDPath, new Function<File, Void>() {
-      @Override
-      public Void apply(File input) {
-        LOG.info("Replaying " + input);
-        WALDataFile.Reader<NullWritable> reader = null;
-        int localIndex = index.get();
-        try {
-          // item stored is a NullWritable so we only store the base WALEntry
-          reader = new WALDataFile.Reader<NullWritable>(input,
-              NullWritable.class);
-          List<WALEntry<NullWritable>> batch;
-          long largestForFile = Long.MIN_VALUE;
-          while ((batch = reader.nextBatch()) != null) {
-            for(WALEntry<NullWritable> entry : batch) {
-              long current = entry.getSequenceID();
-              sequenceIDs.put(localIndex++, current);
-              largestForFile = Math.max(largestForFile, current);
-            }
-          }
-          sequenceID.set(Math.max(largestForFile, sequenceID.get()));
-          fileLargestSequenceIDMap.put(input.getAbsolutePath(),
-              largestForFile);
-        } catch (IOException e) {
-          Throwables.propagate(e);
-        } finally {
-          index.set(localIndex);
-          if (reader != null) {
-            try {
-              reader.close();
-            } catch (IOException e) {
-            }
-          }
-        }
-        return null;
-      }
-    });
-
-    sequenceIDs.sort();
-
-    // now read all edits storing items with a sequence id
-    // which is *not* in the sequenceIDs
-    final List<WALEntry<T>> entries = Lists.newArrayList();
-    final Class<T> dataClazz = clazz;
-    readFiles(dataPath, new Function<File, Void>() {
-      @Override
-      public Void apply(File input) {
-        LOG.info("Replaying " + input);
-        WALDataFile.Reader<T> reader = null;
-        try {
-          reader = new WALDataFile.Reader<T>(input, dataClazz);
-          List<WALEntry<T>> batch = Lists.newArrayList();
-          long largestForFile = Long.MIN_VALUE;
-          while ((batch = reader.nextBatch()) != null) {
-            for(WALEntry<T> entry : batch) {
-              long current = entry.getSequenceID();
-              if (!sequenceIDs.exists(current)) {
-                entries.add(entry);
-              }
-              largestForFile = Math.max(largestForFile, current);
-            }
-          }
-          sequenceID.set(Math.max(largestForFile, sequenceID.get()));
-          fileLargestSequenceIDMap.put(input.getAbsolutePath(),
-              largestForFile);
-        } catch (IOException e) {
-          Throwables.propagate(e);
-        } finally {
-          if (reader != null) {
-            try {
-              reader.close();
-            } catch (IOException e) {
-            }
-          }
-        }
-        return null;
-      }
-    });
-    sequenceIDs.close();
-    synchronized (this.fileLargestSequenceIDMap) {
-      this.fileLargestSequenceIDMap.clear();
-      this.fileLargestSequenceIDMap.putAll(fileLargestSequenceIDMap);
-      LOG.info("SequenceIDMap " + fileLargestSequenceIDMap);
-    }
-    largestCommitedSequenceID.set(sequenceID.get());
-    LOG.info("Replay complete: LargestCommitedSequenceID = " + largestCommitedSequenceID.get());
-    return new WALReplayResult<T>(entries, largestCommitedSequenceID.get());
-  }
-
-  public void writeEntries(List<WALEntry<T>> entries) throws IOException {
-    Preconditions.checkNotNull(dataFileWALWriter,
-        "Write is null, close must have been called");
-    synchronized (this) {
-      if (isRollRequired()) {
-        roll();
-      }
-    }
-    waitWhileRolling();
-    boolean error = true;
-    try {
-      dataFileWALWriter.append(entries);
-      error = false;
-    } finally {
-      if (error) {
-        rollRequired = true;
-      }
-    }
-  }
-
-  public void writeEntry(WALEntry<T> entry) throws IOException {
-    List<WALEntry<T>> entries = Lists.newArrayList();
-    entries.add(entry);
-    writeEntries(entries);
-  }
-
-  public void writeSequenceID(long sequenceID) throws IOException {
-    List<Long> sequenceIDs = Lists.newArrayList();
-    sequenceIDs.add(sequenceID);
-    writeSequenceIDs(sequenceIDs);
-  }
-  public void writeSequenceIDs(List<Long> sequenceIDs) throws IOException {
-    Preconditions.checkNotNull(sequenceIDWALWriter,
-        "Write is null, close must have been called");
-    synchronized (this) {
-      if (isRollRequired()) {
-        roll();
-      }
-    }
-    waitWhileRolling();
-    boolean error = true;
-    try {
-      List<WALEntry<NullWritable>> entries = Lists.newArrayList();
-      for(Long sequenceID : sequenceIDs) {
-      largestCommitedSequenceID.set(Math.max(sequenceID,
-          largestCommitedSequenceID.get()));
-      entries.add(new WALEntry<NullWritable>(NullWritable.get(), sequenceID));
-      sequenceIDWALWriter.append(entries);
-      }
-      error = false;
-    } finally {
-      if (error) {
-        rollRequired = true;
-      }
-    }
-  }
-
-  private void waitWhileRolling() {
-    synchronized (this) {
-      while (rollInProgress) {
-        try {
-          wait();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (backgroundWorker != null) {
-      backgroundWorker.shutdown();
-    }
-    if (sequenceIDWALWriter != null) {
-      sequenceIDWALWriter.close();
-      sequenceIDWALWriter = null;
-    }
-    if (dataFileWALWriter != null) {
-      dataFileWALWriter.close();
-      dataFileWALWriter = null;
-    }
-  }
-
-  private boolean isRollRequired() throws IOException {
-    if (rollRequired) {
-      return true;
-    }
-    return Math.max(dataFileWALWriter.getSize(), sequenceIDWALWriter.getSize()) > rollSize;
-  }
-
-  private void readFiles(File path, Function<File, Void> function)
-      throws IOException {
-    File[] dataFiles = path.listFiles();
-    List<File> files = Lists.newArrayList();
-    if (dataFiles != null) {
-      for (File dataFile : dataFiles) {
-        if (!dataFile.isFile()) {
-          throw new IOException("Not file " + dataFile);
-        }
-        files.add(dataFile);
-      }
-    }
-    for (File dataFile : files) {
-      function.apply(dataFile);
-    }
-  }
-
-  private void createOrDie(File path) throws IOException {
-    if (!path.isDirectory()) {
-      if (!path.mkdirs()) {
-        throw new IOException("Unable to create " + path);
-      }
-    }
-  }
-
-  private static class Worker extends Thread {
-    private WAL<? extends Writable> wal;
-    private volatile boolean run = true;
-    public Worker(WAL<? extends Writable> wal) {
-      this.wal = wal;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Background worker reporting for duty");
-      while (run) {
-        try {
-          try {
-            Thread.sleep(wal.workerInterval);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-          if (!run) {
-            continue;
-          }
-          List<String> filesToRemove = Lists.newArrayList();
-          long totalSize = 0;
-          synchronized (wal.fileLargestSequenceIDMap) {
-            for (String key : wal.fileLargestSequenceIDMap.keySet()) {
-              File file = new File(key);
-              totalSize += file.length();
-            }
-            if (totalSize >= wal.maxLogsSize) {
-              for (String key : wal.fileLargestSequenceIDMap.keySet()) {
-                File file = new File(key);
-                Long seqid = wal.fileLargestSequenceIDMap.get(key);
-                long largestCommitedSeqID = wal.largestCommitedSequenceID.get();
-                if (file.exists()
-                    // has not been modified in 5 minutes
-                    && System.currentTimeMillis() - file.lastModified() > wal.minLogRetentionPeriod
-                    // current seqid is greater than the largest seqid in the file
-                    && largestCommitedSeqID > seqid) {
-                  filesToRemove.add(key);
-                  LOG.info("Removing expired file " + key + ", seqid = "
-                      + seqid + ", result = " + file.delete());
-                }
-              }
-              for (String key : filesToRemove) {
-                wal.fileLargestSequenceIDMap.remove(key);
-              }
-            }
-          }
-        } catch (Exception ex) {
-          LOG.error("Uncaught exception in background worker", ex);
-        }
-      }
-      LOG.warn(this.getClass().getSimpleName()
-          + " moving on due to stop request");
-    }
-
-    public void shutdown() {
-      run = false;
-      this.interrupt();
-    }
-  }
-
-
-  public void setRollSize(long rollSize) {
-    this.rollSize = rollSize;
-  }
-
-  public void setMaxLogsSize(long maxLogsSize) {
-    this.maxLogsSize = maxLogsSize;
-  }
-
-  public void setMinLogRetentionPeriod(long minLogRetentionPeriod) {
-    this.minLogRetentionPeriod = minLogRetentionPeriod;
-  }
-
-  public void setWorkerInterval(long workerInterval) {
-    this.workerInterval = workerInterval;
-  }
-
-  /**
-   * Reads in a WAL and writes out a new WAL. Used if for some reason a replay
-   * cannot occur due to the size of the WAL or assumptions about the number of
-   * sequenceids.
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public static void main(String[] args) throws IOException,
-  ClassNotFoundException {
-    Preconditions.checkPositionIndex(0, args.length,
-        "input directory is a required arg");
-    Preconditions.checkPositionIndex(1, args.length,
-        "output directory is a required arg");
-    Preconditions.checkPositionIndex(2, args.length,
-        "classname is a required arg");
-    String input = args[0];
-    String output = args[1];
-    Class clazz = Class.forName(args[2].trim());
-    WAL inputWAL = new WAL(new File(input), clazz);
-    if (args.length == 4) {
-      inputWAL.numReplaySequenceIDOverride = Integer.parseInt(args[3]);
-      System.out.println("Overridng numReplaySequenceIDOverride: "
-          + inputWAL.numReplaySequenceIDOverride);
-    }
-    WALReplayResult<?> result = inputWAL.replay();
-    inputWAL.close();
-    System.out.println("     SeqID: " + result.getSequenceID());
-    System.out.println("NumEntries: " + result.getResults().size());
-    WAL outputWAL = new WAL(new File(output), clazz);
-    outputWAL.writeEntries(result.getResults());
-    outputWAL.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
deleted file mode 100644
index 823f17c..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Lists;
-
-class WALDataFile<T extends Writable> {
-
-  private static final int VERSION = 1;
-
-  private static final int RECORD_TYPE_EVENT = 1;
-  private static final int RECORD_TYPE_COMMIT = 2;
-
-  static class Reader<T extends Writable> implements Closeable {
-    Class<T> clazz;
-    DataInputStream input;
-    private Configuration conf = new Configuration();
-    Reader(File path, Class<T> clazz) throws IOException {
-      this.clazz = clazz;
-      input = new DataInputStream(new FileInputStream(path));
-      int version = input.readInt();
-      if(version != VERSION) {
-        throw new IOException("Expected " + VERSION + " and got " + version);
-      }
-    }
-
-    List<WALEntry<T>> nextBatch() throws IOException {
-      List<WALEntry<T>> batch = Lists.newArrayList();
-      // read until we hit a commit marker or until the
-      // commit marker is encountered
-      while(true) {
-        try {
-          int type = input.readInt();
-          if(type == RECORD_TYPE_EVENT) {
-            WALEntry<T> entry = newWALEntry(clazz, conf);
-            entry.readFields(input);
-            batch.add(entry);
-          } else if(type == RECORD_TYPE_COMMIT) {
-            // we only return what we have read if we find a command entry
-            return batch;
-          } else {
-            throw new IOException("Unknown record type " + Integer.toHexString(type));
-          }
-        } catch(EOFException e) {
-          // in the EOF case, we crashed or shutdown while writing a batch
-          // and were unable to complete that batch. As such the client
-          // would have gotten an exception and retried or locally
-          // stored the batch for resending later
-          return null;
-        }
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (input != null) {
-        input.close();
-      }
-    }
-  }
-
-  /**
-   * Append and flush operations are synchronized as we are modifying
-   * a file in said methods.
-   */
-  static class Writer<T extends Writable> implements Closeable {
-    private FileOutputStream fileOutput;
-    private DataOutputStream dataOutput;
-    private AtomicLong largestSequenceID = new AtomicLong(0);
-    private File path;
-
-    Writer(File path) throws IOException {
-      this.path = path;
-      fileOutput = new FileOutputStream(path);
-      dataOutput = new DataOutputStream(fileOutput);
-      dataOutput.writeInt(VERSION);
-      flush();
-    }
-
-    // TODO group commit
-    synchronized void append(List<WALEntry<T>> entries) throws IOException {
-      for (WALEntry<T> entry : entries) {
-        largestSequenceID.set(Math.max(entry.getSequenceID(), largestSequenceID.get()));
-        dataOutput.writeInt(RECORD_TYPE_EVENT);
-        entry.write(dataOutput);
-      }
-      // if this is successful, the events have been
-      // successfully persisted and will be replayed
-      // in the case of a crash
-      dataOutput.writeInt(RECORD_TYPE_COMMIT);
-      flush(false);
-    }
-
-    synchronized void flush() throws IOException {
-      flush(true);
-    }
-    synchronized void flush(boolean metadata) throws IOException {
-      fileOutput.getChannel().force(metadata);
-    }
-
-    public long getLargestSequenceID() {
-      return largestSequenceID.get();
-    }
-    public File getPath() {
-      return path;
-    }
-
-    public long getSize() {
-      return dataOutput.size();
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-      if (dataOutput != null) {
-        flush();
-        dataOutput.close();
-      }
-    }
-  }
-  private static <T extends Writable> WALEntry<T> newWALEntry(Class<T> clazz, Configuration conf) {
-    return new WALEntry<T>(ReflectionUtils.newInstance(clazz, conf));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
deleted file mode 100644
index 3bb69f9..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Wraps a {@link Writable} with a sequence id so that both can
- * be written together to a file.
- */
-public class WALEntry<T extends Writable> implements Writable {
-  /**
-   * Provides a minimum guarantee we are not reading complete junk
-   */
-  private static final int MAGIC_HEADER = 0xdeadbeef;
-
-  private T data;
-  private long sequenceID;
-  /**
-   * Only to be used when reading a wal file from disk
-   */
-  WALEntry(T data) {
-    this(data, -1);
-  }
-  /**
-   * Creates a WALEntry with specified payload and sequence id
-   * @param data
-   * @param sequenceID
-   */
-  public WALEntry(T data, long sequenceID) {
-    this.data = data;
-    this.sequenceID = sequenceID;
-  }
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int header = in.readInt();
-    if(header != MAGIC_HEADER) {
-      throw new IOException("Header is " + Integer.toHexString(header) +
-          " expected " + Integer.toHexString(MAGIC_HEADER));
-    }
-    sequenceID = in.readLong();
-    data.readFields(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(MAGIC_HEADER);
-    out.writeLong(sequenceID);
-    data.write(out);
-  }
-
-  public T getData() {
-    return data;
-  }
-
-  public long getSequenceID() {
-    return sequenceID;
-  }
-
-  static int getBaseSize() {
-    int base = 4 /* magic header of type int */ + 8 /* seqid of type long */;
-    return base;
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
deleted file mode 100644
index e11b575..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class WALReplayResult<T extends Writable> {
-
-  private List<WALEntry<T>> results;
-  private long sequenceID;
-
-  public WALReplayResult(List<WALEntry<T>> results, long sequenceID) {
-    this.results = results;
-    this.sequenceID = sequenceID;
-  }
-  public List<WALEntry<T>> getResults() {
-    return results;
-  }
-  public long getSequenceID() {
-    return sequenceID;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
deleted file mode 100644
index d6313d7..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.NullSink;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class TestRecoverableMemoryChannel {
-
-  private static final Logger logger = LoggerFactory
-  .getLogger(TestRecoverableMemoryChannel.class);
-
-  private RecoverableMemoryChannel channel;
-  Context context;
-
-  private File dataDir;
-
-  @Before
-  public void setUp() {
-    dataDir = Files.createTempDir();
-    Assert.assertTrue(dataDir.isDirectory());
-    channel = createFileChannel();
-
-  }
-
-  private RecoverableMemoryChannel createFileChannel() {
-    RecoverableMemoryChannel channel = new RecoverableMemoryChannel();
-    context = new Context();
-    context.put(RecoverableMemoryChannel.WAL_DATA_DIR, dataDir.getAbsolutePath());
-    Configurables.configure(channel, context);
-    channel.start();
-    return channel;
-  }
-
-  @After
-  public void teardown() {
-    FileUtils.deleteQuietly(dataDir);
-  }
-  @Test
-  public void testRestart() throws Exception {
-    List<String> in = Lists.newArrayList();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
-    }
-    channel.stop();
-    channel = createFileChannel();
-    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
-    Collections.sort(in);
-    Collections.sort(out);
-    Assert.assertEquals(in, out);
-  }
-  @Test
-  public void testReconfigure() throws Exception {
-    List<String> in = Lists.newArrayList();
-    try {
-      while(true) {
-        in.addAll(putEvents(channel, "restart", 1, 1));
-      }
-    } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
-    }
-    Configurables.configure(channel, context);
-    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
-    Collections.sort(in);
-    Collections.sort(out);
-    Assert.assertEquals(in, out);
-  }
-  @Test
-  public void testRollbackWithSink() throws Exception {
-    final NullSink nullSink = new NullSink();
-    Context ctx = new Context();
-    ctx.put("batchSize", "1");
-    nullSink.configure(ctx);
-    nullSink.setChannel(channel);
-    final int numItems = 99;
-    Thread t = new Thread() {
-      @Override
-      public void run() {
-        int count = 0;
-        while(count++ < numItems) {
-          try {
-            nullSink.process();
-            Thread.sleep(1);
-          } catch(EventDeliveryException e) {
-            break;
-          } catch (Exception e) {
-            Throwables.propagate(e);
-          }
-        }
-      }
-    };
-    t.setDaemon(true);
-    t.setName("NullSink");
-    t.start();
-
-    putEvents(channel, "rollback", 10, 100);
-
-    Transaction transaction;
-    // put an item we will rollback
-    transaction = channel.getTransaction();
-    transaction.begin();
-    channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
-    transaction.rollback();
-    transaction.close();
-
-    while(t.isAlive()) {
-      Thread.sleep(1);
-    }
-
-
-    // simulate crash
-    channel.stop();
-    channel = createFileChannel();
-
-    // get the item which was not rolled back
-    transaction = channel.getTransaction();
-    transaction.begin();
-    Event event = channel.take();
-    transaction.commit();
-    transaction.close();
-    Assert.assertNotNull(event);
-    Assert.assertEquals("rollback-90-9", new String(event.getBody(), Charsets.UTF_8));
-  }
-
-
-  @Test
-  public void testRollback() throws Exception {
-    // put an item and commit
-    putEvents(channel, "rollback", 1, 50);
-
-    Transaction transaction;
-    // put an item we will rollback
-    transaction = channel.getTransaction();
-    transaction.begin();
-    channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
-    transaction.rollback();
-    transaction.close();
-
-    // simulate crash
-    channel.stop();
-    channel = createFileChannel();
-
-    // get the item which was not rolled back
-    transaction = channel.getTransaction();
-    transaction.begin();
-    Event event = channel.take();
-    transaction.commit();
-    transaction.close();
-    Assert.assertNotNull(event);
-    Assert.assertEquals("rollback-0-0", new String(event.getBody(), Charsets.UTF_8));
-  }
-  @Test
-  public void testPut() throws Exception {
-    // should find no items
-    int found = takeEvents(channel, 1, 5).size();
-    Assert.assertEquals(0, found);
-    putEvents(channel, "unbatched", 1, 5);
-    putEvents(channel, "batched", 5, 5);
-  }
-  @Test
-  public void testThreaded() throws IOException, InterruptedException {
-    int numThreads = 10;
-    final CountDownLatch producerStopLatch = new CountDownLatch(numThreads);
-    // due to limited capacity we must wait for consumers to start to put
-    final CountDownLatch consumerStartLatch = new CountDownLatch(numThreads);
-    final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
-    final List<Exception> errors = Collections
-        .synchronizedList(new ArrayList<Exception>());
-    final List<String> expected = Collections
-        .synchronizedList(new ArrayList<String>());
-    final List<String> actual = Collections
-        .synchronizedList(new ArrayList<String>());
-    for (int i = 0; i < numThreads; i++) {
-      final int id = i;
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            consumerStartLatch.await();
-            if (id % 2 == 0) {
-              expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
-            } else {
-              expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
-            }
-            logger.info("Completed some puts " + expected.size());
-          } catch (Exception e) {
-            logger.error("Error doing puts", e);
-            errors.add(e);
-          } finally {
-            producerStopLatch.countDown();
-          }
-        }
-      };
-      t.setDaemon(true);
-      t.start();
-    }
-    for (int i = 0; i < numThreads; i++) {
-      final int id = i;
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            consumerStartLatch.countDown();
-            consumerStartLatch.await();
-            while(!producerStopLatch.await(1, TimeUnit.SECONDS) ||
-                expected.size() > actual.size()) {
-              if (id % 2 == 0) {
-                actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
-              } else {
-                actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
-              }
-            }
-            if(actual.isEmpty()) {
-              logger.error("Found nothing!");
-            } else {
-              logger.info("Completed some takes " + actual.size());
-            }
-          } catch (Exception e) {
-            logger.error("Error doing takes", e);
-            errors.add(e);
-          } finally {
-            consumerStopLatch.countDown();
-          }
-        }
-      };
-      t.setDaemon(true);
-      t.start();
-    }
-    Assert.assertTrue("Timed out waiting for producers",
-        producerStopLatch.await(30, TimeUnit.SECONDS));
-    Assert.assertTrue("Timed out waiting for consumer",
-        consumerStopLatch.await(30, TimeUnit.SECONDS));
-    Assert.assertEquals(Collections.EMPTY_LIST, errors);
-    Collections.sort(expected);
-    Collections.sort(actual);
-    Assert.assertEquals(expected, actual);
-  }
-  private static List<String> takeEvents(Channel channel, int batchSize,
-      int numEvents) throws Exception {
-    List<String> result = Lists.newArrayList();
-    for (int i = 0; i < numEvents; i += batchSize) {
-      for (int j = 0; j < batchSize; j++) {
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        try {
-          Event event = channel.take();
-          if(event == null) {
-            transaction.commit();
-            return result;
-          }
-          result.add(new String(event.getBody(), Charsets.UTF_8));
-          transaction.commit();
-        } catch (Exception ex) {
-          transaction.rollback();
-          throw ex;
-        } finally {
-          transaction.close();
-        }
-      }
-    }
-    return result;
-  }
-  private static List<String> putEvents(Channel channel, String prefix, int batchSize,
-      int numEvents) throws Exception {
-    List<String> result = Lists.newArrayList();
-    for (int i = 0; i < numEvents; i += batchSize) {
-      for (int j = 0; j < batchSize; j++) {
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        try {
-          String s = prefix + "-" + i +"-" + j;
-          Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
-          result.add(s);
-          channel.put(event);
-          transaction.commit();
-        } catch (Exception ex) {
-          transaction.rollback();
-          throw ex;
-        } finally {
-          transaction.close();
-        }
-      }
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
deleted file mode 100644
index 4a7ce28..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.util.Random;
-
-import org.apache.flume.channel.recoverable.memory.wal.SequenceIDBuffer;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSequenceIDBuffer {
-
-  @Test
-  public void testBinarySearch() {
-    int size = 100;
-    SequenceIDBuffer buffer = new SequenceIDBuffer(size);
-    Assert.assertEquals(size, buffer.size());
-    for (int i = 0; i < 100; i++) {
-      buffer.put(i, i);
-    }
-    buffer.sort();
-    Assert.assertFalse(buffer.exists(-1));
-    Assert.assertFalse(buffer.exists(101));
-    for (int i = 0; i < 100; i++) {
-      Assert.assertTrue(buffer.exists(i));
-    }
-  }
-
-  @Test
-  public void testSortAndCompareTo() {
-    int size = 100;
-    SequenceIDBuffer buffer = new SequenceIDBuffer(size);
-    Assert.assertEquals(size, buffer.size());
-    Random random = new Random();
-    for (int i = 0; i < 100; i++) {
-      buffer.put(i, Math.abs(random.nextLong()));
-    }
-
-    buffer.sort();
-
-    long last = Long.MIN_VALUE;
-    for (int i = 0; i < 100; i++) {
-      long current = buffer.get(i);
-      Assert.assertTrue(last <= current);
-    }
-  }
-
-  @Test
-  public void testSwap() {
-    SequenceIDBuffer buffer = new SequenceIDBuffer(2);
-    buffer.put(0, Long.MAX_VALUE);
-    buffer.put(1, Long.MIN_VALUE);
-    buffer.swap(0, 1);
-    Assert.assertEquals(buffer.get(0), Long.MIN_VALUE);
-    Assert.assertEquals(buffer.get(1), Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
deleted file mode 100644
index 52e3606..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.channel.recoverable.memory.wal.WAL;
-import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
-import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class TestWAL {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(TestWAL.class);
-
-  private File dataDir;
-  private WAL<Text> wal;
-
-  @Before
-  public void setup() throws IOException {
-    dataDir = Files.createTempDir();
-    Assert.assertTrue(dataDir.isDirectory());
-    wal = new WAL<Text>(dataDir, Text.class);
-  }
-  @After
-  public void teardown() throws IOException {
-    wal.close();
-    FileUtils.deleteQuietly(dataDir);
-  }
-
-  /**
-   * Create a whole bunch of files and ensure they are cleaned up
-   */
-  @Test
-  public void testRoll() throws IOException, InterruptedException {
-    wal.close();
-    wal = new WAL<Text>(dataDir, Text.class, 0L, 0L, 0L, 1L);
-    long seqid = 0;
-    List<String> expected = strings(100);
-    for(String s : expected) {
-      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
-      Thread.sleep(1);
-      wal.writeSequenceID(seqid);
-      Thread.sleep(1);
-    }
-    wal.writeSequenceID(Long.MAX_VALUE);
-    Thread.sleep(1000L);
-    wal.close();
-    File seq = new File(dataDir, "seq");
-    File[] seqFiles = seq.listFiles();
-    Assert.assertNotNull(seqFiles);
-    Assert.assertTrue(seqFiles.length < 5);
-    File data = new File(dataDir, "data");
-    File[] dataFiles = data.listFiles();
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length < 5);
-  }
-
-  @Test
-  public void testBasicReplay() throws IOException {
-    long seqid = 0;
-    List<String> expected = strings(100);
-    for(String s : expected) {
-      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
-    }
-    wal.close();
-    wal = new WAL<Text>(dataDir, Text.class);
-    WALReplayResult<Text> result = wal.replay();
-    Assert.assertEquals(100, result.getSequenceID());
-    List<String> actual = toStringList(result.getResults());
-    Assert.assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testReplayAtOffset() throws IOException {
-    long seqid = 0;
-    List<String> expected = strings(100);
-    for(String s : expected) {
-      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
-    }
-    wal.writeSequenceID(50);
-    expected.remove(50);
-    wal.close();
-    wal = new WAL<Text>(dataDir, Text.class);
-    WALReplayResult<Text> result = wal.replay();
-    Assert.assertEquals(100, result.getSequenceID());
-    List<String> actual = toStringList(result.getResults());
-    Collections.sort(expected);
-    Collections.sort(actual);
-    Assert.assertEquals(99, actual.size());
-    Assert.assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testReplayNone() throws IOException {
-    long seqid = 0;
-    List<String> expected = strings(100);
-    for(String s : expected) {
-      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
-      wal.writeSequenceID(seqid);
-    }
-    wal.close();
-    wal = new WAL<Text>(dataDir, Text.class);
-    WALReplayResult<Text> result = wal.replay();
-    Assert.assertEquals(expected.size(), result.getSequenceID());
-    List<String> actual = toStringList(result.getResults());
-    Assert.assertEquals(Collections.EMPTY_LIST, actual);
-  }
-
-  @Test
-  public void testThreadedAppend() throws IOException, InterruptedException {
-    int numThreads = 10;
-    final CountDownLatch startLatch = new CountDownLatch(numThreads);
-    final CountDownLatch stopLatch = new CountDownLatch(numThreads);
-    final AtomicLong seqid = new AtomicLong(0);
-    final List<String> globalExpected = Collections.synchronizedList(new ArrayList<String>());
-    final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>());
-    for (int i = 0; i < numThreads; i++) {
-      final int id = i;
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            List<String> expected = strings(100);
-            globalExpected.addAll(expected);
-            startLatch.countDown();
-            startLatch.await();
-            // half batch, half do not
-            if(id % 2 == 0) {
-              for(String s : expected) {
-                wal.writeEntry(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
-              }
-            } else {
-              List<WALEntry<Text>> batch = Lists.newArrayList();
-              for(String s : expected) {
-                batch.add(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
-              }
-              wal.writeEntries(batch);
-            }
-          } catch (Exception e) {
-            logger.warn("Error doing appends", e);
-            errors.add(e);
-          } finally {
-            stopLatch.countDown();
-          }
-        }
-      };
-      t.setDaemon(true);
-      t.start();
-    }
-    Assert.assertTrue(stopLatch.await(30, TimeUnit.SECONDS));
-    Assert.assertEquals(Collections.EMPTY_LIST, errors);
-    wal.close();
-    wal = new WAL<Text>(dataDir, Text.class);
-    WALReplayResult<Text> result = wal.replay();
-    Assert.assertEquals(1000, result.getSequenceID());
-    List<String> actual = toStringList(result.getResults());
-    // we don't know what order the items threads will be able to
-    // append to the wal, so sort to the lists to make then sensible
-    Collections.sort(actual);
-    Collections.sort(globalExpected);
-    Assert.assertEquals(globalExpected, actual);
-  }
-
-  @Test(expected=IOException.class)
-  public void testInvalidReadClass() throws IOException {
-    wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
-    wal.close();
-    new WAL<IntWritable>(dataDir, IntWritable.class);
-  }
-
-  @Test(expected=NullPointerException.class)
-  public void testCloseSingle() throws IOException {
-    wal.close();
-    wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test(expected=NullPointerException.class)
-  public void testCloseList() throws IOException {
-    wal.close();
-    wal.writeEntries(Lists.newArrayList(new WALEntry<Text>(new Text(""), 1)));
-  }
-
-  @Test(expected=NullPointerException.class)
-  public void testCloseSequenceID() throws IOException {
-    wal.close();
-    wal.writeSequenceID(1L);
-  }
-
-  private static List<String> strings(int num) {
-    List<String> result = Lists.newArrayList();
-    for (int i = 0; i < num; i++) {
-      String s = Integer.toString(num);
-      result.add(s);
-    }
-    return result;
-  }
-  private static List<String> toStringList(List<WALEntry<Text>> list) {
-    List<String> result = Lists.newArrayList();
-    for(WALEntry<Text> entry : list) {
-      result.add(entry.getData().toString());
-    }
-    return result;
-  }
-
-  public static void main(String[] args) throws IOException {
-    Preconditions.checkPositionIndex(0, args.length,
-        "size  of event is a required arg");
-    Preconditions.checkPositionIndex(1, args.length,
-        "batch size is a required arg");
-
-    int size = Integer.parseInt(args[0]);
-    int batchSize = Integer.parseInt(args[1]);
-
-    byte[] buffer = new byte[size];
-    for (int i = 0; i < buffer.length; i++) {
-      buffer[i] = (byte)'A';
-    }
-    BytesWritable bytes = new BytesWritable(buffer);
-    List<WALEntry<BytesWritable>> batch = Lists.newArrayList();
-    long seqid = 0;
-    long numBytes = 0;
-    long count = 0;
-    long start = System.currentTimeMillis();
-    File dataDir = Files.createTempDir();
-    try {
-      WAL<BytesWritable>  wal = new WAL<BytesWritable>(dataDir, BytesWritable.class);
-      while(true) {
-        batch.clear();
-        for (int i = 0; i < batchSize; i++) {
-          batch.add(new  WALEntry<BytesWritable>(bytes, seqid++));
-        }
-        wal.writeEntries(batch);
-        count += batchSize;
-        numBytes += buffer.length * batchSize;
-
-        long expired = System.currentTimeMillis() - start;
-        if(expired > 10000L) {
-          start = System.currentTimeMillis();
-          System.out.println(String.format("Events/s %d, MB/s %4.2f", (count/10),
-              (double)(numBytes/1024L/1024L)/(double)(expired/1000L)));
-          numBytes = count = 0;
-        }
-      }
-    } finally {
-      FileUtils.deleteQuietly(dataDir);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties b/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
deleted file mode 100644
index 739ecc8..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-log4j.rootLogger = INFO, out
-
-log4j.appender.out = org.apache.log4j.ConsoleAppender
-log4j.appender.out.layout = org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.flume = DEBUG

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml
index 5c6fa76..6485d47 100644
--- a/flume-ng-channels/pom.xml
+++ b/flume-ng-channels/pom.xml
@@ -43,6 +43,5 @@ limitations under the License.
   <modules>
     <module>flume-jdbc-channel</module>
     <module>flume-file-channel</module>
-    <module>flume-recoverable-memory-channel</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
index 1e1a46f..26f4dd7 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
@@ -44,15 +44,7 @@ public class ChannelConfiguration extends ComponentConfiguration {
     /**
      * JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
      */
-    JDBC("org.apache.flume.conf.channel.JdbcChannelConfiguration"),
-
-    //For now, I am leaving just basic validation for recoverable
-    //memory channel. This should be updated when proper config stubs are added.
-    /**
-     * Recoverable Memory Channel
-     * @see org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
-     */
-    RECOVERABLEMEMORY("org.apache.flume.conf.channel.RecoverableMemoryChannelConfiguration");
+    JDBC("org.apache.flume.conf.channel.JdbcChannelConfiguration");
 
     private String channelConfigurationType;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
index 475341d..15b8cc3 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
@@ -44,15 +44,8 @@ public enum ChannelType {
   /**
    * JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
    */
-  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
+  JDBC("org.apache.flume.channel.jdbc.JdbcChannel");
 
-  /**
-   * Recoverable Memory Channel
-   *
-   * @see org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
-   */
-  RECOVERABLEMEMORY(
-      "org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel");
 
   private final String channelClassName;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 7d7688c..f20ff1e 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -93,10 +93,6 @@
       <artifactId>flume-jdbc-channel</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.flume.flume-ng-channels</groupId>
-      <artifactId>flume-recoverable-memory-channel</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>
       <artifactId>flume-hdfs-sink</artifactId>
     </dependency>