You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/05 22:34:59 UTC

svn commit: r1334488 [2/3] - in /incubator/flume/trunk: flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/...

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java Sat May  5 20:34:58 2012
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a single data file on disk. Has methods to write,
+ * read sequentially (replay), and read randomly (channel takes).
+ */
+class LogFile {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(LogFile.class);
+  /**
+   * This class preallocates the data files 1MB at time to avoid
+   * the updating of the inode on each write and to avoid the disk
+   * filling up during a write. It's also faster, so there.
+   */
+  private static final ByteBuffer FILL = DirectMemoryUtils.
+      allocate(1024 * 1024); // preallocation, 1MB
+  public static final long MAX_FILE_SIZE =
+      Integer.MAX_VALUE - (1024L * 1024L);
+
+  private static final byte OP_RECORD = Byte.MAX_VALUE;
+  private static final byte OP_EOF = Byte.MIN_VALUE;
+
+  static {
+    for (int i = 0; i < FILL.capacity(); i++) {
+      FILL.put(OP_EOF);
+    }
+  }
+  private static final int VERSION = 1;
+
+
+  static class Writer {
+    private final int fileID;
+    private final File file;
+    private final long maxFileSize;
+    private final RandomAccessFile writeFileHandle;
+    private final FileChannel writeFileChannel;
+
+    private volatile boolean open;
+
+    Writer(File file, int logFileID, long maxFileSize) throws IOException {
+      this.file = file;
+      fileID = logFileID;
+      this.maxFileSize = Math.min(maxFileSize, MAX_FILE_SIZE);
+      writeFileHandle = new RandomAccessFile(file, "rw");
+      writeFileHandle.writeInt(VERSION);
+      writeFileHandle.writeInt(fileID);
+      writeFileChannel = writeFileHandle.getChannel();
+      writeFileChannel.force(true);
+      LOG.info("Opened " + file);
+      open = true;
+    }
+
+    String getParent() {
+      return file.getParent();
+    }
+    synchronized void close() {
+      if(open) {
+        open = false;
+        if(writeFileChannel.isOpen()) {
+          LOG.info("Closing " + file);
+          try {
+            writeFileChannel.force(false);
+          } catch (IOException e) {
+            LOG.warn("Unable to flush to disk", e);
+          }
+          try {
+            writeFileHandle.close();
+          } catch (IOException e) {
+            LOG.info("Unable to close", e);
+          }
+        }
+      }
+    }
+
+    synchronized long length() throws IOException {
+      return writeFileChannel.position();
+    }
+
+    synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
+      Pair<Integer, Integer> pair = write(buffer);
+      return new FlumeEventPointer(pair.getLeft(), pair.getRight());
+    }
+    synchronized void take(ByteBuffer buffer) throws IOException {
+      write(buffer);
+    }
+    synchronized void rollback(ByteBuffer buffer) throws IOException {
+      write(buffer);
+    }
+    synchronized void commit(ByteBuffer buffer) throws IOException {
+      write(buffer);
+      sync();
+    }
+
+    synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException {
+      return open && length() + (long) buffer.capacity() > maxFileSize;
+    }
+
+    int getFileID() {
+      return fileID;
+    }
+    private void sync() throws IOException {
+      Preconditions.checkState(open, "File closed");
+      writeFileChannel.force(false);
+    }
+    private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
+      Preconditions.checkState(open, "File closed");
+      long length = length();
+      long expectedLength = length + (long) buffer.capacity();
+      Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
+      int offset = (int)length;
+      Preconditions.checkState(offset > 0);
+      preallocate(1 + buffer.capacity());
+      writeFileHandle.writeByte(OP_RECORD);
+      int wrote = writeFileChannel.write(buffer);
+      Preconditions.checkState(wrote == buffer.limit());
+      return Pair.of(fileID, offset);
+    }
+    private void preallocate(int size) throws IOException {
+      long position = writeFileChannel.position();
+      if(position + size > writeFileChannel.size()) {
+        LOG.debug("Preallocating at position " + position);
+        synchronized (FILL) {
+          FILL.position(0);
+          writeFileChannel.write(FILL, position);
+        }
+      }
+    }
+
+  }
+
+  static class RandomReader {
+    private final File file;
+    private final BlockingQueue<RandomAccessFile> readFileHandles =
+        new ArrayBlockingQueue<RandomAccessFile>(50, true);
+
+    private volatile boolean open;
+    public RandomReader(File file) throws IOException {
+      this.file = file;
+      readFileHandles.add(open());
+      open = true;
+    }
+    FlumeEvent get(int offset) throws IOException, InterruptedException {
+      Preconditions.checkState(open, "File closed");
+      RandomAccessFile fileHandle = checkOut();
+      boolean error = true;
+      try {
+        fileHandle.seek(offset);
+        byte operation = fileHandle.readByte();
+        Preconditions.checkState(operation == OP_RECORD);
+        TransactionEventRecord record = TransactionEventRecord.
+            fromDataInput(fileHandle);
+        if(!(record instanceof Put)) {
+          Preconditions.checkState(false, "Record is " +
+              record.getClass().getSimpleName());
+        }
+        error = false;
+        return ((Put)record).getEvent();
+      } finally {
+        if(error) {
+          close(fileHandle);
+        } else {
+          checkIn(fileHandle);
+        }
+      }
+    }
+    synchronized void close() {
+      if(open) {
+        open = false;
+        LOG.info("Closing RandomReader " + file);
+        List<RandomAccessFile> fileHandles = Lists.newArrayList();
+        while(readFileHandles.drainTo(fileHandles) > 0) {
+          for(RandomAccessFile fileHandle : fileHandles) {
+            synchronized (fileHandle) {
+              try {
+                fileHandle.close();
+              } catch (IOException e) {
+                LOG.info("Unable to close fileHandle for " + file);
+              }
+            }
+          }
+          fileHandles.clear();
+          try {
+            Thread.sleep(100L);
+          } catch (InterruptedException e) {
+            // this is uninterruptable
+          }
+        }
+      }
+    }
+    private RandomAccessFile open() throws IOException {
+      return new RandomAccessFile(file, "r");
+    }
+
+    private void checkIn(RandomAccessFile fileHandle) {
+      if(!readFileHandles.offer(fileHandle)) {
+        close(fileHandle);
+      }
+    }
+    private RandomAccessFile checkOut() 
+        throws IOException, InterruptedException {
+      RandomAccessFile fileHandle = readFileHandles.poll();
+      if(fileHandle != null) {
+        return fileHandle;
+      }
+      int remaining = readFileHandles.remainingCapacity();
+      if(remaining > 0) {
+        LOG.info("Opening " + file + " for read, remaining capacity is "
+            + remaining);
+        return open();
+      }
+      return readFileHandles.take();
+    }
+    private static void close(RandomAccessFile fileHandle) {
+      if(fileHandle != null) {
+        try {
+          fileHandle.close();
+        } catch (IOException e) {}
+      }
+    }
+  }
+
+  static class SequentialReader {
+    private final RandomAccessFile fileHandle;
+    private final FileChannel fileChannel;
+    private final int version;
+    private final int logFileID;
+
+    /**
+     * Construct a Sequential Log Reader object
+     * @param file
+     * @throws IOException if an I/O error occurs
+     * @throws EOFException if the file is empty
+     */
+    SequentialReader(File file) throws IOException, EOFException {
+      fileHandle = new RandomAccessFile(file, "r");
+      fileChannel = fileHandle.getChannel();
+      version = fileHandle.readInt();
+      if(version != VERSION) {
+        throw new IOException("Version is " + Integer.toHexString(version) +
+            " expected " + Integer.toHexString(VERSION));
+      }
+      logFileID = fileHandle.readInt();
+      Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
+          + Integer.toHexString(logFileID));
+    }
+    int getVersion() {
+      return version;
+    }
+    int getLogFileID() {
+      return logFileID;
+    }
+    Pair<Integer, TransactionEventRecord> next() throws IOException {
+      try {
+        long position = fileChannel.position();
+        Preconditions.checkState(position < MAX_FILE_SIZE,
+            String.valueOf(position));
+        int offset = (int) position;
+        byte operation = fileHandle.readByte();
+        if(operation != OP_RECORD) {
+          return null;
+        }
+        TransactionEventRecord record = TransactionEventRecord.
+            fromDataInput(fileHandle);
+        Preconditions.checkState(offset > 0);
+        return Pair.of(offset, record);
+      } catch(EOFException e) {
+        return null;
+      }
+    }
+    void close() {
+      if(fileHandle != null) {
+        try {
+          fileHandle.close();
+        } catch (IOException e) {}
+      }
+    }
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java Sat May  5 20:34:58 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class LogUtils {
+
+  /**
+   * Sort a list of files by the number after Log.PREFIX.
+   */
+  static void sort(List<File> logs) {
+    Collections.sort(logs, new Comparator<File>() {
+      @Override
+      public int compare(File file1, File file2) {
+        int id1 = getIDForFile(file1);
+        int id2 = getIDForFile(file2);
+        if (id1 > id2) {
+          return 1;
+        } else if (id1 == id2) {
+          return 0;
+        }
+        return -1;
+      }
+    });
+  }
+  /**
+   * Get the id after the Log.PREFIX
+   */
+  static int getIDForFile(File file) {
+    return Integer.parseInt(file.getName().substring(Log.PREFIX.length()));
+  }
+  /**
+   * Find all log files within a directory
+   *
+   * @param logDir directory to search
+   * @return List of data files within logDir
+   */
+  static List<File> getLogs(File logDir) {
+    List<File> result = Lists.newArrayList();
+    for (File file : logDir.listFiles()) {
+      String name = file.getName();
+      if (name.startsWith(Log.PREFIX)) {
+        result.add(file);
+      }
+    }
+    return result;
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java Sat May  5 20:34:58 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+class Pair<L,R> {
+
+  private final L left;
+  private final R right;
+  Pair(L l, R r) {
+    left = l;
+    right = r;
+  }
+  L getLeft() {
+    return left;
+  }
+  R getRight() {
+    return right;
+  }
+  static <L, R> Pair<L, R> of(L left, R right) {
+    return new Pair<L, R>(left, right);
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java Sat May  5 20:34:58 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Put on disk
+ */
+class Put extends TransactionEventRecord {
+  private FlumeEvent event;
+
+  Put(Long transactionID) {
+    super(transactionID);
+  }
+
+  Put(Long transactionID, FlumeEvent event) {
+    this(transactionID);
+    this.event = event;
+  }
+
+  FlumeEvent getEvent() {
+    return event;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    event = FlumeEvent.from(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    event.write(out);
+  }
+  @Override
+  public short getRecordType() {
+    return Type.PUT.get();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java Sat May  5 20:34:58 2012
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Processes a set of data logs, replaying said logs into the queue.
+ */
+class ReplayHandler {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ReplayHandler.class);
+  private final FlumeEventQueue queue;
+  private final long lastCheckpoint;
+  /**
+   * This data structure stores takes for which we found a commit in the log
+   * files before we found a commit for the put. This can happen if the channel
+   * is configured for multiple directories.
+   *
+   * Consider the following:
+   *
+   * logdir1, logdir2
+   *
+   * Put goes to logdir2 Commit of Put goes to logdir2 Take goes to logdir1
+   * Commit of Take goes to logdir1
+   *
+   * When replaying we will start with log1 and find the take and commit before
+   * finding the put and commit in logdir2.
+   */
+  private final List<Long> pendingTakes;
+
+  ReplayHandler(FlumeEventQueue queue, long lastCheckpoint) {
+    this.queue = queue;
+    this.lastCheckpoint = lastCheckpoint;
+    pendingTakes = Lists.newArrayList();
+  }
+
+  void replayLog(List<File> logs) throws IOException {
+    int total = 0;
+    int count = 0;
+    MultiMap transactionMap = new MultiValueMap();
+    LOG.info("Starting replay of " + logs);
+    for (File log : logs) {
+      LOG.info("Replaying " + log);
+      LogFile.SequentialReader reader = null;
+      try {
+        reader = new LogFile.SequentialReader(log);
+        Pair<Integer, TransactionEventRecord> entry;
+        FlumeEventPointer ptr;
+        // for puts the fileId is the fileID of the file they exist in
+        // for takes the fileId and offset are pointers to a put
+        int fileId = reader.getLogFileID();
+        while ((entry = reader.next()) != null) {
+          int offset = entry.getLeft();
+          TransactionEventRecord record = entry.getRight();
+          short type = record.getRecordType();
+          long trans = record.getTransactionID();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("record.getTimestamp() = " + record.getTimestamp()
+                + ", lastCheckpoint = " + lastCheckpoint + ", fileId = "
+                + fileId + ", offset = " + offset + ", type = "
+                + TransactionEventRecord.getName(type) + ", transaction "
+                + trans);
+          }
+          if (record.getTimestamp() > lastCheckpoint) {
+            if (type == TransactionEventRecord.Type.PUT.get()) {
+              ptr = new FlumeEventPointer(fileId, offset);
+              transactionMap.put(trans, ptr);
+            } else if (type == TransactionEventRecord.Type.TAKE.get()) {
+              Take take = (Take) record;
+              ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
+              transactionMap.put(trans, ptr);
+            } else if (type == TransactionEventRecord.Type.ROLLBACK.get()) {
+              transactionMap.remove(trans);
+            } else if (type == TransactionEventRecord.Type.COMMIT.get()) {
+              @SuppressWarnings("unchecked")
+              Collection<FlumeEventPointer> pointers = 
+                (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+              if (pointers != null && pointers.size() > 0) {
+                processCommit(((Commit) record).getType(), pointers);
+                count += pointers.size();
+              }
+            } else {
+              Preconditions.checkArgument(false, "Unknown record type: "
+                  + Integer.toHexString(type));
+            }
+
+          }
+        }
+        LOG.info("Replayed " + count + " from " + log);
+      } catch (EOFException e) {
+        LOG.warn("Hit EOF on " + log);
+      } finally {
+        total += count;
+        count = 0;
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+    int pendingTakesSize = pendingTakes.size();
+    if (pendingTakesSize > 0) {
+      String msg = "Pending takes " + pendingTakesSize
+          + " exist after the end of replay";
+      if (LOG.isDebugEnabled()) {
+        for (Long pointer : pendingTakes) {
+          LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
+        }
+        Preconditions.checkState(false, msg);
+      } else {
+        LOG.error(msg + ". Duplicate messages will exist in destination.");
+      }
+    }
+    LOG.info("Replayed " + total);
+  }
+
+  private void processCommit(short type, Collection<FlumeEventPointer> pointers) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing commit of " + TransactionEventRecord.getName(type));
+    }
+    if (type == TransactionEventRecord.Type.PUT.get()) {
+      for (FlumeEventPointer pointer : pointers) {
+        Preconditions.checkState(queue.addTail(pointer), "Unable to add "
+            + pointer);
+        if (pendingTakes.remove(pointer.toLong())) {
+          Preconditions.checkState(queue.remove(pointer),
+              "Take was pending and pointer was successfully added to the"
+                  + " queue but could not be removed: " + pointer);
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Commited Put " + pointer);
+        }
+      }
+    } else if (type == TransactionEventRecord.Type.TAKE.get()) {
+      for (FlumeEventPointer pointer : pointers) {
+        boolean removed = queue.remove(pointer);
+        if (!removed) {
+          pendingTakes.add(pointer.toLong());
+          if (LOG.isDebugEnabled()) {
+            LOG.info("Unable to remove " + pointer + " added to pending list");
+          }
+        }
+      }
+    } else {
+      Preconditions.checkArgument(false,
+          "Unknown record type: " + Integer.toHexString(type));
+    }
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java Sat May  5 20:34:58 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Rollback on disk
+ */
+class Rollback extends TransactionEventRecord {
+  Rollback(Long transactionID) {
+    super(transactionID);
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+  }
+  @Override
+  short getRecordType() {
+    return Type.ROLLBACK.get();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java Sat May  5 20:34:58 2012
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Take on disk
+ */
+class Take extends TransactionEventRecord {
+  private int offset;
+  private int fileID;
+  Take(Long transactionID) {
+    super(transactionID);
+  }
+  Take(Long transactionID, int offset, int fileID) {
+    this(transactionID);
+    this.offset = offset;
+    this.fileID = fileID;
+  }
+  int getOffset() {
+    return offset;
+  }
+
+  int getFileID() {
+    return fileID;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    offset = in.readInt();
+    fileID = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(offset);
+    out.writeInt(fileID);
+  }
+  @Override
+  short getRecordType() {
+    return Type.TAKE.get();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java Sat May  5 20:34:58 2012
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class for records in data file: Put, Take, Rollback, Commit
+ */
+abstract class TransactionEventRecord implements Writable {
+  private final long transactionID;
+  private long timestamp;
+
+  protected TransactionEventRecord(long transactionID) {
+    this.transactionID = transactionID;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    timestamp = in.readLong();
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(timestamp);
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+  public long getTimestamp() {
+    return timestamp;
+  }
+  long getTransactionID() {
+    return transactionID;
+  }
+
+  abstract short getRecordType();
+
+
+  /**
+   * Provides a minimum guarantee we are not reading complete junk
+   */
+  static final int MAGIC_HEADER = 0xdeadbeef;
+
+  static enum Type {
+    PUT((short)1),
+    TAKE((short)2),
+    ROLLBACK((short)3),
+    COMMIT((short)4);
+
+    private short id;
+    Type(short id) {
+      this.id = id;
+    }
+    public short get() {
+      return id;
+    }
+  }
+  private static final ImmutableMap<Short, Constructor<? extends TransactionEventRecord>> TYPES;
+
+  static {
+    ImmutableMap.Builder<Short, Constructor<? extends TransactionEventRecord>> builder =
+        ImmutableMap.<Short, Constructor<? extends TransactionEventRecord>>builder();
+    try {
+      builder.put(Type.PUT.get(),
+          Put.class.getDeclaredConstructor(Long.class));
+      builder.put(Type.TAKE.get(),
+          Take.class.getDeclaredConstructor(Long.class));
+      builder.put(Type.ROLLBACK.get(),
+          Rollback.class.getDeclaredConstructor(Long.class));
+      builder.put(Type.COMMIT.get(),
+          Commit.class.getDeclaredConstructor(Long.class));
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+    TYPES = builder.build();
+  }
+
+
+  static ByteBuffer toByteBuffer(TransactionEventRecord record) {
+    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
+    DataOutputStream dataOutput = new DataOutputStream(byteOutput);
+    try {
+      dataOutput.writeInt(MAGIC_HEADER);
+      dataOutput.writeShort(record.getRecordType());
+      dataOutput.writeLong(record.getTransactionID());
+      record.write(dataOutput);
+      dataOutput.flush();
+      // TODO toByteArray does an unneeded copy
+      return ByteBuffer.wrap(byteOutput.toByteArray());
+    } catch(IOException e) {
+      // near impossible
+      Throwables.propagate(e);
+    } finally {
+      if(dataOutput != null) {
+        try {
+          dataOutput.close();
+        } catch (IOException e) {}
+      }
+    }
+    throw new IllegalStateException(
+        "Should not occur as method should return or throw an exception");
+  }
+
+  static TransactionEventRecord fromDataInput(DataInput in) throws IOException {
+    int header = in.readInt();
+    if(header != MAGIC_HEADER) {
+      throw new IOException("Header " + Integer.toHexString(header) +
+          " not expected value: " + Integer.toHexString(MAGIC_HEADER));
+    }
+    short type = in.readShort();
+    long transactionID = in.readLong();
+    TransactionEventRecord entry = newRecordForType(type, transactionID);
+    entry.readFields(in);
+    return entry;
+  }
+
+  static String getName(short type) {
+    Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
+    Preconditions.checkNotNull(constructor, "Unknown action " +
+        Integer.toHexString(type));
+    return constructor.getDeclaringClass().getSimpleName();
+  }
+
+  private static TransactionEventRecord newRecordForType(short type, long transactionID) {
+    Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
+    Preconditions.checkNotNull(constructor, "Unknown action " +
+        Integer.toHexString(type));
+    try {
+      return constructor.newInstance(transactionID);
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+    throw new IllegalStateException(
+        "Should not occur as method should return or throw an exception");
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java Sat May  5 20:34:58 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.List;
+
+import org.apache.flume.Sink;
+
+import com.google.common.collect.Lists;
+
+public class CountingSinkRunner extends Thread {
+  private int count;
+  private final int until;
+  private final Sink sink;
+  private volatile boolean run;
+  private final List<Exception> errors = Lists.newArrayList();
+  public CountingSinkRunner(Sink sink) {
+    this(sink, Integer.MAX_VALUE);
+  }
+  public CountingSinkRunner(Sink sink, int until) {
+    this.sink = sink;
+    this.until = until;
+  }
+  @Override
+  public void run() {
+    run = true;
+    while(run && count < until) {
+      boolean error = true;
+      try {
+        if(Sink.Status.READY.equals(sink.process())) {
+          count++;
+          error = false;
+        }
+      } catch(Exception ex) {
+        errors.add(ex);
+      }
+      if(error) {
+        try {
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {}
+      }
+    }
+  }
+  public void shutdown() {
+    run = false;
+  }
+  public int getCount() {
+    return count;
+  }
+  public List<Exception> getErrors() {
+    return errors;
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java Sat May  5 20:34:58 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.PollableSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+
+import com.google.common.collect.Lists;
+
+public class CountingSourceRunner extends Thread {
+  private int count;
+  private final int until;
+  private final PollableSource source;
+  private volatile boolean run;
+  private final List<Exception> errors = Lists.newArrayList();
+  public CountingSourceRunner(PollableSource source) {
+    this(source, Integer.MAX_VALUE);
+  }
+  public CountingSourceRunner(PollableSource source, int until) {
+    this(source, until, null);
+  }
+  public CountingSourceRunner(PollableSource source, Channel channel) {
+    this(source, Integer.MAX_VALUE, channel);
+  }
+  public CountingSourceRunner(PollableSource source, int until, Channel channel) {
+    this.source = source;
+    this.until = until;
+    if(channel != null) {
+      ReplicatingChannelSelector selector = new ReplicatingChannelSelector();
+      List<Channel> channels = Lists.newArrayList();
+      channels.add(channel);
+      selector.setChannels(channels);
+      this.source.setChannelProcessor(new ChannelProcessor(selector));
+    }
+  }
+  @Override
+  public void run() {
+    run = true;
+    while(run && count < until) {
+      boolean error = true;
+      try {
+        if(PollableSource.Status.READY.equals(source.process())) {
+          count++;
+          error = false;
+        }
+      } catch(Exception ex) {
+        errors.add(ex);
+      }
+      if(error) {
+        try {
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {}
+      }
+    }
+  }
+  public void shutdown() {
+    run = false;
+  }
+  public int getCount() {
+    return count;
+  }
+  public List<Exception> getErrors() {
+    return errors;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java Sat May  5 20:34:58 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCheckpoint {
+
+  File file;
+  @Before
+  public void setup() throws IOException {
+    file = File.createTempFile("Checkpoint", "");
+    Assert.assertTrue(file.isFile());
+    Assert.assertTrue(file.canWrite());
+  }
+  @After
+  public void cleanup() {
+    file.delete();
+  }
+  @Test
+  public void testSerialization() throws IOException {
+    FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
+    FlumeEventQueue queueIn = new FlumeEventQueue(1);
+    queueIn.addHead(ptrIn);
+    Checkpoint checkpoint = new Checkpoint(file, 1);
+    Assert.assertEquals(0, checkpoint.getTimestamp());
+    checkpoint.write(queueIn);
+    FlumeEventQueue queueOut = checkpoint.read();
+    FlumeEventPointer ptrOut = queueOut.removeHead();
+    Assert.assertEquals(ptrIn, ptrOut);
+    Assert.assertTrue(checkpoint.getTimestamp() > 0);
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1334488&r1=1334487&r2=1334488&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java Sat May  5 20:34:58 2012
@@ -21,191 +21,432 @@ package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
-import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
 public class TestFileChannel {
 
-  private static final Logger logger = LoggerFactory
+  private static final Logger LOG = LoggerFactory
       .getLogger(TestFileChannel.class);
 
   private FileChannel channel;
+  private File checkpointDir;
+  private File[] dataDirs;
+  private String dataDir;
+  private final Context context = new Context();
 
   @Before
-  public void setUp() {
-    channel = new FileChannel();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testNoDirectory() {
-    Event event = EventBuilder.withBody("Test event".getBytes());
+  public void setup() {
+    checkpointDir = Files.createTempDir();
+    dataDirs = new File[3];
+    dataDir = "";
+    for (int i = 0; i < dataDirs.length; i++) {
+      dataDirs[i] = Files.createTempDir();
+      Assert.assertTrue(dataDirs[i].isDirectory());
+      dataDir += dataDirs[i].getAbsolutePath() + ",";
+    }
+    dataDir = dataDir.substring(0, dataDir.length() - 1);
+    channel = createFileChannel(1000);
 
-    channel.put(event);
   }
-
-  @Test(expected = IllegalStateException.class)
-  public void testNonExistantParent() {
-    Event event = EventBuilder.withBody("Test event".getBytes());
-
-    channel.setDirectory(new File("/i/do/not/exist"));
-    channel.put(event);
+  private FileChannel createFileChannel() {
+    return createFileChannel(FileChannelConfiguration.DEFAULT_CAPACITY);
+  }
+  private FileChannel createFileChannel(int capacity) {
+    FileChannel channel = new FileChannel();
+    context.put(FileChannelConfiguration.CHECKPOINT_DIR,
+        checkpointDir.getAbsolutePath());
+    context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
+    context.put(FileChannelConfiguration.CAPACITY, String.valueOf(capacity));
+    Configurables.configure(channel, context);
+    channel.start();
+    return channel;
+  }
+  @After
+  public void teardown() {
+    if(channel != null) {
+      channel.stop();
+    }
+    FileUtils.deleteQuietly(checkpointDir);
+    for (int i = 0; i < dataDirs.length; i++) {
+      FileUtils.deleteQuietly(dataDirs[i]);
+    }
   }
-
   @Test
-  public void testGetTransaction() throws IOException {
-    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
-    FileUtils.forceDeleteOnExit(tmpDir);
-
-    channel.setDirectory(tmpDir);
-
-    Transaction tx1 = channel.getTransaction();
-    Assert.assertNotNull(tx1);
-
-    Transaction tx2 = channel.getTransaction();
-    Assert.assertNotNull(tx2);
-
-    Assert.assertEquals(tx1, tx2);
-
-    tx2.begin();
-    Assert.assertEquals(tx2, channel.getTransaction());
-
-    tx2.rollback();
-    Assert.assertEquals(tx2, channel.getTransaction());
-
-    tx2.close();
-    Assert.assertFalse(tx2.equals(channel.getTransaction()));
+  public void testRestart() throws Exception {
+    List<String> in = Lists.newArrayList();
+    try {
+      while(true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
+      }
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+    }
+    channel.stop();
+    channel = createFileChannel();
+    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Collections.sort(in);
+    Collections.sort(out);
+    Assert.assertEquals(in, out);
   }
-
-  /**
-   * <p>
-   * Ensure two threads calling {@link FileChannel#getTransaction()} get
-   * different transactions back.
-   * </p>
-   * 
-   */
   @Test
-  public void testConcurrentGetTransaction() throws IOException,
-      InterruptedException, ExecutionException {
-    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
-    FileUtils.forceDeleteOnExit(tmpDir);
-    final CyclicBarrier latch = new CyclicBarrier(2);
-
-    channel.setDirectory(tmpDir);
-
-    Callable<FileBackedTransaction> getTransRunnable = new Callable<FileBackedTransaction>() {
-
-      @Override
-      public FileBackedTransaction call() {
-        Transaction tx = null;
-
-        try {
-          /*
-           * Wait for all threads to pile up to prevent thread reuse in the
-           * pool.
-           */
-          latch.await();
-          tx = channel.getTransaction();
-          /*
-           * This await isn't strictly necessary but it guarantees both threads
-           * entered and exited getTransaction() in lock step which simplifies
-           * debugging.
-           */
-          latch.await();
-        } catch (InterruptedException e) {
-          logger.error("Interrupted while waiting for threads", e);
-          Assert.fail();
-        } catch (BrokenBarrierException e) {
-          logger.error("Barrier broken", e);
-          Assert.fail();
-        }
-
-        return (FileBackedTransaction) tx;
+  public void testReconfigure() throws Exception {
+    List<String> in = Lists.newArrayList();
+    try {
+      while(true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
       }
-
-    };
-
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-
-    Future<FileBackedTransaction> f1 = pool.submit(getTransRunnable);
-    Future<FileBackedTransaction> f2 = pool.submit(getTransRunnable);
-
-    FileBackedTransaction t1 = f1.get();
-    FileBackedTransaction t2 = f2.get();
-
-    Assert.assertNotSame("Transactions from different threads are different",
-        t1, t2);
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+    }
+    Configurables.configure(channel, context);
+    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Collections.sort(in);
+    Collections.sort(out);
+    Assert.assertEquals(in, out);
   }
-
   @Test
-  public void testPut() throws IOException {
-    File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
-    FileUtils.forceDeleteOnExit(tmpDir);
-
-    if (!tmpDir.mkdirs()) {
-      throw new IOException("Unable to create test directory:" + tmpDir);
+  public void testPut() throws Exception {
+    // should find no items
+    int found = takeEvents(channel, 1, 5).size();
+    Assert.assertEquals(0, found);
+    List<String> expected = Lists.newArrayList();
+    expected.addAll(putEvents(channel, "unbatched", 1, 5));
+    expected.addAll(putEvents(channel, "batched", 5, 5));
+    List<String> actual = takeEvents(channel, 1);
+    Collections.sort(actual);
+    Collections.sort(expected);
+    Assert.assertEquals(expected, actual);
+  }
+  @Test
+  public void testRollbackAfterNoPutTake() throws Exception {
+    Transaction transaction;
+    transaction = channel.getTransaction();
+    transaction.begin();
+    transaction.rollback();
+    transaction.close();
+
+    // ensure we can reopen log with no error
+    channel.stop();
+    channel = createFileChannel();
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Assert.assertNull(channel.take());
+    transaction.commit();
+    transaction.close();
+  }
+  @Test
+  public void testCommitAfterNoPutTake() throws Exception {
+    Transaction transaction;
+    transaction = channel.getTransaction();
+    transaction.begin();
+    transaction.commit();
+    transaction.close();
+
+    // ensure we can reopen log with no error
+    channel.stop();
+    channel = createFileChannel();
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Assert.assertNull(channel.take());
+    transaction.commit();
+    transaction.close();
+  }
+  @Test
+  public void testCapacity() throws Exception {
+    channel.close();
+    channel = createFileChannel(5);
+    try {
+      putEvents(channel, "capacity", 1, 6);
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
     }
+    // take an event, roll it back, and
+    // then make sure a put fails
+    Transaction transaction;
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    transaction.rollback();
+    transaction.close();
+    // ensure the take the didn't change the state of the capacity
+    try {
+      putEvents(channel, "capacity", 1, 1);
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+    }
+    // ensure we the events back
+    Assert.assertEquals(5, takeEvents(channel, 1, 5).size());
+  }
+  @Test
+  public void testRollbackSimulatedCrash() throws Exception {
+    int numEvents = 50;
+    List<String> in = putEvents(channel, "rollback", 1, numEvents);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8)));
+    transaction.rollback();
+    transaction.close();
+
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+
+    // we should not get the rolled back item
+    List<String> out = takeEvents(channel, 1, numEvents);
+    Collections.sort(in);
+    Collections.sort(out);
+    Assert.assertEquals(in, out);
+  }
+  @Test
+  public void testRollbackSimulatedCrashWithSink() throws Exception {
+    int numEvents = 100;
 
-    channel.setDirectory(tmpDir);
-
-    /* Issue five one record transactions. */
-    for (int i = 0; i < 5; i++) {
-      Transaction transaction = channel.getTransaction();
+    LoggerSink sink = new LoggerSink();
+    sink.setChannel(channel);
+    // sink will leave one item
+    CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1);
+    runner.start();
+    putEvents(channel, "rollback", 10, numEvents);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    byte[] bytes = "rolled back".getBytes(Charsets.UTF_8);
+    channel.put(EventBuilder.withBody(bytes));
+    transaction.rollback();
+    transaction.close();
 
-      Assert.assertNotNull(transaction);
+    while(runner.isAlive()) {
+      Thread.sleep(10L);
+    }
+    Assert.assertEquals(numEvents - 1, runner.getCount());
+    for(Exception ex : runner.getErrors()) {
+      LOG.warn("Sink had error", ex);
+    }
+    Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());
 
-      try {
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+
+    List<String> out = takeEvents(channel, 1, 1);
+    Assert.assertEquals(1, out.size());
+    Assert.assertEquals("rollback-90-9", out.get(0));
+  }
+  @Test
+  public void testThreaded() throws IOException, InterruptedException {
+    int numThreads = 10;
+    final CountDownLatch startLatch = new CountDownLatch(numThreads * 2);
+    final CountDownLatch stopLatch = new CountDownLatch(numThreads * 2);
+    final List<Exception> errors = Collections
+        .synchronizedList(new ArrayList<Exception>());
+    final List<String> expected = Collections
+        .synchronizedList(new ArrayList<String>());
+    final List<String> actual = Collections
+        .synchronizedList(new ArrayList<String>());
+    for (int i = 0; i < numThreads; i++) {
+      final int id = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            if (id % 2 == 0) {
+              expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
+            } else {
+              expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
+            }
+          } catch (Exception e) {
+            errors.add(e);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+    }
+    for (int i = 0; i < numThreads; i++) {
+      final int id = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            Thread.sleep(100L); // ensure puts have started
+            if (id % 2 == 0) {
+              actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+            } else {
+              actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
+            }
+          } catch (Exception e) {
+            errors.add(e);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+    }
+    Assert.assertTrue(stopLatch.await(30, TimeUnit.SECONDS));
+    Assert.assertEquals(Collections.EMPTY_LIST, errors);
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+  }
+  @Test(expected=IOException.class)
+  public void testLocking() throws IOException {
+    try {
+      createFileChannel();
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      Assert.assertNotNull(cause);
+      Assert.assertTrue(cause.getClass().getSimpleName(),
+          cause instanceof IOException);
+      String msg = cause.getMessage();
+      Assert.assertNotNull(msg);
+      Assert.assertTrue(msg.endsWith("The directory is already locked."));
+      throw (IOException)cause;
+    }
+  }
+  @Test
+  public void testIntegration() throws IOException, InterruptedException {
+    // set shorter checkpoint and filesize to ensure
+    // checkpoints and rolls occur during the test
+    context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+        String.valueOf(10L * 1000L));
+    context.put(FileChannelConfiguration.MAX_FILE_SIZE,
+        String.valueOf(1024 * 1024 * 5));
+    // do reconfiguration
+    Configurables.configure(channel, context);
+
+    SequenceGeneratorSource source = new SequenceGeneratorSource();
+    CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel);
+
+    NullSink sink = new NullSink();
+    sink.setChannel(channel);
+    CountingSinkRunner sinkRunner = new CountingSinkRunner(sink);
+
+    sinkRunner.start();
+    sourceRunner.start();
+    Thread.sleep(TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES));
+    // shutdown source
+    sourceRunner.shutdown();
+    while(sourceRunner.isAlive()) {
+      Thread.sleep(10L);
+    }
+    // wait for queue to clear
+    while(channel.getDepth() > 0) {
+      Thread.sleep(10L);
+    }
+    // shutdown size
+    sinkRunner.shutdown();
+    // wait a few seconds
+    Thread.sleep(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS));
+    List<File> logs = Lists.newArrayList();
+    for (int i = 0; i < dataDirs.length; i++) {
+      logs.addAll(LogUtils.getLogs(dataDirs[i]));
+    }
+    LOG.info("Total Number of Logs = " + logs.size());
+    for(File logFile : logs) {
+      LOG.info("LogFile = " + logFile);
+    }
+    LOG.info("Source processed " + sinkRunner.getCount());
+    LOG.info("Sink processed " + sourceRunner.getCount());
+    for(Exception ex : sourceRunner.getErrors()) {
+      LOG.warn("Source had error", ex);
+    }
+    for(Exception ex : sinkRunner.getErrors()) {
+      LOG.warn("Sink had error", ex);
+    }
+    Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());
+    Assert.assertEquals(Collections.EMPTY_LIST, sinkRunner.getErrors());
+    Assert.assertEquals(Collections.EMPTY_LIST, sourceRunner.getErrors());
+  }
+  private static List<String> takeEvents(Channel channel,
+      int batchSize) throws Exception {
+    return takeEvents(channel, batchSize, Integer.MAX_VALUE);
+  }
+  private static List<String> takeEvents(Channel channel,
+      int batchSize, int numEvents) throws Exception {
+    List<String> result = Lists.newArrayList();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      for (int j = 0; j < batchSize; j++) {
+        Transaction transaction = channel.getTransaction();
         transaction.begin();
-
-        Event event = EventBuilder.withBody(("Test event" + i).getBytes());
-        channel.put(event);
-
-        transaction.commit();
-      } catch (Exception e) {
-        logger.error(
-            "Failed to put event into file channel. Exception follows.", e);
-        transaction.rollback();
-        Assert.fail();
-      } finally {
-        transaction.close();
+        try {
+          Event event = channel.take();
+          if(event == null) {
+            transaction.commit();
+            return result;
+          }
+          result.add(new String(event.getBody(), Charsets.UTF_8));
+          transaction.commit();
+        } catch (Exception ex) {
+          transaction.rollback();
+          throw ex;
+        } finally {
+          transaction.close();
+        }
       }
     }
-
-    /* Issue one five record transaction. */
-    Transaction transaction = channel.getTransaction();
-
-    try {
-      transaction.begin();
-
-      for (int i = 0; i < 5; i++) {
-        Event event = EventBuilder.withBody(("Test event" + i).getBytes());
-        channel.put(event);
+    return result;
+  }
+  private static List<String> putEvents(Channel channel, String prefix,
+      int batchSize, int numEvents) throws Exception {
+    List<String> result = Lists.newArrayList();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      for (int j = 0; j < batchSize; j++) {
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        try {
+          String s = prefix + "-" + i +"-" + j;
+          Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
+          result.add(s);
+          channel.put(event);
+          transaction.commit();
+        } catch (Exception ex) {
+          transaction.rollback();
+          throw ex;
+        } finally {
+          transaction.close();
+        }
       }
-
-      transaction.commit();
-    } catch (Exception e) {
-      logger.error("Failed to put event into file channel. Exception follows.",
-          e);
-      transaction.rollback();
-      Assert.fail();
-    } finally {
-      transaction.close();
     }
+    return result;
   }
-
-}
+}
\ No newline at end of file

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java Sat May  5 20:34:58 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+
+public class TestFlumeEvent {
+
+  @Test
+  public void testBasics() {
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("key", "value");
+    byte[] body = "flume".getBytes(Charsets.UTF_8);
+    FlumeEvent event = new FlumeEvent(headers, body);
+    Assert.assertEquals(headers, event.getHeaders());
+    Assert.assertTrue(Arrays.equals(body, event.getBody()));
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("key", "value");
+    byte[] body = "flume".getBytes(Charsets.UTF_8);
+    FlumeEvent in = new FlumeEvent(headers, body);
+    FlumeEvent out = FlumeEvent.from(TestUtils.toDataInput(in));
+    Assert.assertEquals(headers, out.getHeaders());
+    Assert.assertTrue(Arrays.equals(body, out.getBody()));
+    in.setHeaders(null);
+    in.setBody(null);
+    out = FlumeEvent.from(TestUtils.toDataInput(in));
+    Assert.assertEquals(Maps.newHashMap(), out.getHeaders());
+    Assert.assertNull(out.getBody());
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java Sat May  5 20:34:58 2012
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestFlumeEventPointer {
+
+  @Test
+  public void testGetter() {
+    FlumeEventPointer pointer = new FlumeEventPointer(1, 1);
+    Assert.assertEquals(1, pointer.getFileID());
+    Assert.assertEquals(1, pointer.getOffset());
+  }
+  @Test
+  public void testEquals() {
+    FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+    FlumeEventPointer pointerB = new FlumeEventPointer(1, 1);
+    Assert.assertEquals(pointerA, pointerB);
+    Assert.assertEquals(pointerB, pointerA);
+    pointerA = new FlumeEventPointer(1, 1);
+    pointerB = new FlumeEventPointer(2, 2);
+    Assert.assertFalse(pointerA.equals(pointerB));
+    Assert.assertFalse(pointerB.equals(pointerA));
+  }
+  @Test
+  public void testHashCode() {
+    FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+    FlumeEventPointer pointerB = new FlumeEventPointer(1, 1);
+    Assert.assertEquals(pointerA.hashCode(), pointerB.hashCode());
+    pointerA = new FlumeEventPointer(1, 1);
+    pointerB = new FlumeEventPointer(2, 2);
+    Assert.assertFalse(pointerA.hashCode() == pointerB.hashCode());
+  }
+
+  @Test
+  public void testPack() {
+    FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+    FlumeEventPointer pointerB = new FlumeEventPointer(1, 2);
+    Assert.assertEquals(4294967297L, pointerA.toLong());
+    Assert.assertEquals(4294967298L, pointerB.toLong());
+    Assert.assertEquals(pointerA, FlumeEventPointer.fromLong(pointerA.toLong()));
+    Assert.assertEquals(pointerB, FlumeEventPointer.fromLong(pointerB.toLong()));
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java Sat May  5 20:34:58 2012
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestFlumeEventQueue {
+
+  FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
+  FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
+  FlumeEventQueue queue;
+  @Before
+  public void setup() {
+    queue = new FlumeEventQueue(1000);
+  }
+  @Test
+  public void testQueueIsEmptyAfterCreation() {
+    Assert.assertNull(queue.removeHead());
+  }
+  @Test
+  public void testCapacity() {
+    queue = new FlumeEventQueue(1);
+    Assert.assertTrue(queue.addTail(pointer1));
+    Assert.assertFalse(queue.addTail(pointer2));
+  }
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidCapacityZero() {
+    queue = new FlumeEventQueue(0);
+  }
+  @Test(expected=IllegalArgumentException.class)
+  public void testInvalidCapacityNegative() {
+    queue = new FlumeEventQueue(-1);
+  }
+  @Test
+  public void addTail1() {
+    Assert.assertTrue(queue.addTail(pointer1));
+    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+  }
+  @Test
+  public void addTail2() {
+    Assert.assertTrue(queue.addTail(pointer1));
+    Assert.assertTrue(queue.addTail(pointer2));
+    Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
+    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
+  }
+  @Test
+  public void addTailLarge() {
+    int size = 500;
+    Set<Integer> fileIDs = Sets.newHashSet();
+    for (int i = 1; i <= size; i++) {
+      Assert.assertTrue(queue.addTail(new FlumeEventPointer(i, i)));
+      fileIDs.add(i);
+      Assert.assertEquals(fileIDs, queue.getFileIDs());
+    }
+    for (int i = 1; i <= size; i++) {
+      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+      fileIDs.remove(i);
+      Assert.assertEquals(fileIDs, queue.getFileIDs());
+    }
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+  }
+  @Test
+  public void addHead1() {
+    Assert.assertTrue(queue.addHead(pointer1));
+    Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+  }
+  @Test
+  public void addHead2() {
+    Assert.assertTrue(queue.addHead(pointer1));
+    Assert.assertTrue(queue.addHead(pointer2));
+    Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
+    Assert.assertEquals(pointer2, queue.removeHead());
+    Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+  }
+  @Test
+  public void addHeadLarge() {
+    int size = 500;
+    Set<Integer> fileIDs = Sets.newHashSet();
+    for (int i = 1; i <= size; i++) {
+      Assert.assertTrue(queue.addHead(new FlumeEventPointer(i, i)));
+      fileIDs.add(i);
+      Assert.assertEquals(fileIDs, queue.getFileIDs());
+    }
+    for (int i = size; i > 0; i--) {
+      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+      fileIDs.remove(i);
+      Assert.assertEquals(fileIDs, queue.getFileIDs());
+    }
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+  }
+  @Test
+  public void addTailRemove1() {
+    Assert.assertTrue(queue.addTail(pointer1));
+    Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+    Assert.assertTrue(queue.remove(pointer1));
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+    Assert.assertNull(queue.removeHead());
+    Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+  }
+
+  @Test
+  public void addTailRemove2() {
+    Assert.assertTrue(queue.addTail(pointer1));
+    Assert.assertTrue(queue.addTail(pointer2));
+    Assert.assertTrue(queue.remove(pointer1));
+    Assert.assertEquals(pointer2, queue.removeHead());
+  }
+
+  @Test
+  public void addHeadRemove1() {
+    queue.addHead(pointer1);
+    Assert.assertTrue(queue.remove(pointer1));
+    Assert.assertNull(queue.removeHead());
+  }
+  @Test
+  public void addHeadRemove2() {
+    Assert.assertTrue(queue.addHead(pointer1));
+    Assert.assertTrue(queue.addHead(pointer2));
+    Assert.assertTrue(queue.remove(pointer1));
+    Assert.assertEquals(pointer2, queue.removeHead());
+  }
+  @Test
+  public void testWrappingCorrectly() {
+    int size = Integer.MAX_VALUE;
+    for (int i = 1; i <= size; i++) {
+      if(!queue.addHead(new FlumeEventPointer(i, i))) {
+        break;
+      }
+    }
+    for (int i = queue.size()/2; i > 0; i--) {
+      Assert.assertNotNull(queue.removeHead());
+    }
+    // addHead below would throw an IndexOOBounds with
+    // bad version of FlumeEventQueue.convert
+    for (int i = 1; i <= size; i++) {
+      if(!queue.addHead(new FlumeEventPointer(i, i))) {
+        break;
+      }
+    }
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native