You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/05 22:34:59 UTC
svn commit: r1334488 [2/3] - in /incubator/flume/trunk:
flume-ng-channels/flume-file-channel/
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/...
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java Sat May 5 20:34:58 2012
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a single data file on disk. Has methods to write,
+ * read sequentially (replay), and read randomly (channel takes).
+ */
+class LogFile {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(LogFile.class);
+ /**
+ * This class preallocates the data files 1MB at time to avoid
+ * the updating of the inode on each write and to avoid the disk
+ * filling up during a write. It's also faster, so there.
+ */
+ private static final ByteBuffer FILL = DirectMemoryUtils.
+ allocate(1024 * 1024); // preallocation, 1MB
+ public static final long MAX_FILE_SIZE =
+ Integer.MAX_VALUE - (1024L * 1024L);
+
+ private static final byte OP_RECORD = Byte.MAX_VALUE;
+ private static final byte OP_EOF = Byte.MIN_VALUE;
+
+ static {
+ for (int i = 0; i < FILL.capacity(); i++) {
+ FILL.put(OP_EOF);
+ }
+ }
+ private static final int VERSION = 1;
+
+
+ static class Writer {
+ private final int fileID;
+ private final File file;
+ private final long maxFileSize;
+ private final RandomAccessFile writeFileHandle;
+ private final FileChannel writeFileChannel;
+
+ private volatile boolean open;
+
+ Writer(File file, int logFileID, long maxFileSize) throws IOException {
+ this.file = file;
+ fileID = logFileID;
+ this.maxFileSize = Math.min(maxFileSize, MAX_FILE_SIZE);
+ writeFileHandle = new RandomAccessFile(file, "rw");
+ writeFileHandle.writeInt(VERSION);
+ writeFileHandle.writeInt(fileID);
+ writeFileChannel = writeFileHandle.getChannel();
+ writeFileChannel.force(true);
+ LOG.info("Opened " + file);
+ open = true;
+ }
+
+ String getParent() {
+ return file.getParent();
+ }
+ synchronized void close() {
+ if(open) {
+ open = false;
+ if(writeFileChannel.isOpen()) {
+ LOG.info("Closing " + file);
+ try {
+ writeFileChannel.force(false);
+ } catch (IOException e) {
+ LOG.warn("Unable to flush to disk", e);
+ }
+ try {
+ writeFileHandle.close();
+ } catch (IOException e) {
+ LOG.info("Unable to close", e);
+ }
+ }
+ }
+ }
+
+ synchronized long length() throws IOException {
+ return writeFileChannel.position();
+ }
+
+ synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
+ Pair<Integer, Integer> pair = write(buffer);
+ return new FlumeEventPointer(pair.getLeft(), pair.getRight());
+ }
+ synchronized void take(ByteBuffer buffer) throws IOException {
+ write(buffer);
+ }
+ synchronized void rollback(ByteBuffer buffer) throws IOException {
+ write(buffer);
+ }
+ synchronized void commit(ByteBuffer buffer) throws IOException {
+ write(buffer);
+ sync();
+ }
+
+ synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException {
+ return open && length() + (long) buffer.capacity() > maxFileSize;
+ }
+
+ int getFileID() {
+ return fileID;
+ }
+ private void sync() throws IOException {
+ Preconditions.checkState(open, "File closed");
+ writeFileChannel.force(false);
+ }
+ private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
+ Preconditions.checkState(open, "File closed");
+ long length = length();
+ long expectedLength = length + (long) buffer.capacity();
+ Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
+ int offset = (int)length;
+ Preconditions.checkState(offset > 0);
+ preallocate(1 + buffer.capacity());
+ writeFileHandle.writeByte(OP_RECORD);
+ int wrote = writeFileChannel.write(buffer);
+ Preconditions.checkState(wrote == buffer.limit());
+ return Pair.of(fileID, offset);
+ }
+ private void preallocate(int size) throws IOException {
+ long position = writeFileChannel.position();
+ if(position + size > writeFileChannel.size()) {
+ LOG.debug("Preallocating at position " + position);
+ synchronized (FILL) {
+ FILL.position(0);
+ writeFileChannel.write(FILL, position);
+ }
+ }
+ }
+
+ }
+
+ static class RandomReader {
+ private final File file;
+ private final BlockingQueue<RandomAccessFile> readFileHandles =
+ new ArrayBlockingQueue<RandomAccessFile>(50, true);
+
+ private volatile boolean open;
+ public RandomReader(File file) throws IOException {
+ this.file = file;
+ readFileHandles.add(open());
+ open = true;
+ }
+ FlumeEvent get(int offset) throws IOException, InterruptedException {
+ Preconditions.checkState(open, "File closed");
+ RandomAccessFile fileHandle = checkOut();
+ boolean error = true;
+ try {
+ fileHandle.seek(offset);
+ byte operation = fileHandle.readByte();
+ Preconditions.checkState(operation == OP_RECORD);
+ TransactionEventRecord record = TransactionEventRecord.
+ fromDataInput(fileHandle);
+ if(!(record instanceof Put)) {
+ Preconditions.checkState(false, "Record is " +
+ record.getClass().getSimpleName());
+ }
+ error = false;
+ return ((Put)record).getEvent();
+ } finally {
+ if(error) {
+ close(fileHandle);
+ } else {
+ checkIn(fileHandle);
+ }
+ }
+ }
+ synchronized void close() {
+ if(open) {
+ open = false;
+ LOG.info("Closing RandomReader " + file);
+ List<RandomAccessFile> fileHandles = Lists.newArrayList();
+ while(readFileHandles.drainTo(fileHandles) > 0) {
+ for(RandomAccessFile fileHandle : fileHandles) {
+ synchronized (fileHandle) {
+ try {
+ fileHandle.close();
+ } catch (IOException e) {
+ LOG.info("Unable to close fileHandle for " + file);
+ }
+ }
+ }
+ fileHandles.clear();
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ // this is uninterruptable
+ }
+ }
+ }
+ }
+ private RandomAccessFile open() throws IOException {
+ return new RandomAccessFile(file, "r");
+ }
+
+ private void checkIn(RandomAccessFile fileHandle) {
+ if(!readFileHandles.offer(fileHandle)) {
+ close(fileHandle);
+ }
+ }
+ private RandomAccessFile checkOut()
+ throws IOException, InterruptedException {
+ RandomAccessFile fileHandle = readFileHandles.poll();
+ if(fileHandle != null) {
+ return fileHandle;
+ }
+ int remaining = readFileHandles.remainingCapacity();
+ if(remaining > 0) {
+ LOG.info("Opening " + file + " for read, remaining capacity is "
+ + remaining);
+ return open();
+ }
+ return readFileHandles.take();
+ }
+ private static void close(RandomAccessFile fileHandle) {
+ if(fileHandle != null) {
+ try {
+ fileHandle.close();
+ } catch (IOException e) {}
+ }
+ }
+ }
+
+ static class SequentialReader {
+ private final RandomAccessFile fileHandle;
+ private final FileChannel fileChannel;
+ private final int version;
+ private final int logFileID;
+
+ /**
+ * Construct a Sequential Log Reader object
+ * @param file
+ * @throws IOException if an I/O error occurs
+ * @throws EOFException if the file is empty
+ */
+ SequentialReader(File file) throws IOException, EOFException {
+ fileHandle = new RandomAccessFile(file, "r");
+ fileChannel = fileHandle.getChannel();
+ version = fileHandle.readInt();
+ if(version != VERSION) {
+ throw new IOException("Version is " + Integer.toHexString(version) +
+ " expected " + Integer.toHexString(VERSION));
+ }
+ logFileID = fileHandle.readInt();
+ Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
+ + Integer.toHexString(logFileID));
+ }
+ int getVersion() {
+ return version;
+ }
+ int getLogFileID() {
+ return logFileID;
+ }
+ Pair<Integer, TransactionEventRecord> next() throws IOException {
+ try {
+ long position = fileChannel.position();
+ Preconditions.checkState(position < MAX_FILE_SIZE,
+ String.valueOf(position));
+ int offset = (int) position;
+ byte operation = fileHandle.readByte();
+ if(operation != OP_RECORD) {
+ return null;
+ }
+ TransactionEventRecord record = TransactionEventRecord.
+ fromDataInput(fileHandle);
+ Preconditions.checkState(offset > 0);
+ return Pair.of(offset, record);
+ } catch(EOFException e) {
+ return null;
+ }
+ }
+ void close() {
+ if(fileHandle != null) {
+ try {
+ fileHandle.close();
+ } catch (IOException e) {}
+ }
+ }
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java Sat May 5 20:34:58 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class LogUtils {
+
+ /**
+ * Sort a list of files by the number after Log.PREFIX.
+ */
+ static void sort(List<File> logs) {
+ Collections.sort(logs, new Comparator<File>() {
+ @Override
+ public int compare(File file1, File file2) {
+ int id1 = getIDForFile(file1);
+ int id2 = getIDForFile(file2);
+ if (id1 > id2) {
+ return 1;
+ } else if (id1 == id2) {
+ return 0;
+ }
+ return -1;
+ }
+ });
+ }
+ /**
+ * Get the id after the Log.PREFIX
+ */
+ static int getIDForFile(File file) {
+ return Integer.parseInt(file.getName().substring(Log.PREFIX.length()));
+ }
+ /**
+ * Find all log files within a directory
+ *
+ * @param logDir directory to search
+ * @return List of data files within logDir
+ */
+ static List<File> getLogs(File logDir) {
+ List<File> result = Lists.newArrayList();
+ for (File file : logDir.listFiles()) {
+ String name = file.getName();
+ if (name.startsWith(Log.PREFIX)) {
+ result.add(file);
+ }
+ }
+ return result;
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java Sat May 5 20:34:58 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+class Pair<L,R> {
+
+ private final L left;
+ private final R right;
+ Pair(L l, R r) {
+ left = l;
+ right = r;
+ }
+ L getLeft() {
+ return left;
+ }
+ R getRight() {
+ return right;
+ }
+ static <L, R> Pair<L, R> of(L left, R right) {
+ return new Pair<L, R>(left, right);
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java Sat May 5 20:34:58 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Put on disk
+ */
+class Put extends TransactionEventRecord {
+ private FlumeEvent event;
+
+ Put(Long transactionID) {
+ super(transactionID);
+ }
+
+ Put(Long transactionID, FlumeEvent event) {
+ this(transactionID);
+ this.event = event;
+ }
+
+ FlumeEvent getEvent() {
+ return event;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ event = FlumeEvent.from(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ event.write(out);
+ }
+ @Override
+ public short getRecordType() {
+ return Type.PUT.get();
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java Sat May 5 20:34:58 2012
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Processes a set of data logs, replaying said logs into the queue.
+ */
+class ReplayHandler {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ReplayHandler.class);
+ private final FlumeEventQueue queue;
+ private final long lastCheckpoint;
+ /**
+ * This data structure stores takes for which we found a commit in the log
+ * files before we found a commit for the put. This can happen if the channel
+ * is configured for multiple directories.
+ *
+ * Consider the following:
+ *
+ * logdir1, logdir2
+ *
+ * Put goes to logdir2 Commit of Put goes to logdir2 Take goes to logdir1
+ * Commit of Take goes to logdir1
+ *
+ * When replaying we will start with log1 and find the take and commit before
+ * finding the put and commit in logdir2.
+ */
+ private final List<Long> pendingTakes;
+
+ ReplayHandler(FlumeEventQueue queue, long lastCheckpoint) {
+ this.queue = queue;
+ this.lastCheckpoint = lastCheckpoint;
+ pendingTakes = Lists.newArrayList();
+ }
+
+ void replayLog(List<File> logs) throws IOException {
+ int total = 0;
+ int count = 0;
+ MultiMap transactionMap = new MultiValueMap();
+ LOG.info("Starting replay of " + logs);
+ for (File log : logs) {
+ LOG.info("Replaying " + log);
+ LogFile.SequentialReader reader = null;
+ try {
+ reader = new LogFile.SequentialReader(log);
+ Pair<Integer, TransactionEventRecord> entry;
+ FlumeEventPointer ptr;
+ // for puts the fileId is the fileID of the file they exist in
+ // for takes the fileId and offset are pointers to a put
+ int fileId = reader.getLogFileID();
+ while ((entry = reader.next()) != null) {
+ int offset = entry.getLeft();
+ TransactionEventRecord record = entry.getRight();
+ short type = record.getRecordType();
+ long trans = record.getTransactionID();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("record.getTimestamp() = " + record.getTimestamp()
+ + ", lastCheckpoint = " + lastCheckpoint + ", fileId = "
+ + fileId + ", offset = " + offset + ", type = "
+ + TransactionEventRecord.getName(type) + ", transaction "
+ + trans);
+ }
+ if (record.getTimestamp() > lastCheckpoint) {
+ if (type == TransactionEventRecord.Type.PUT.get()) {
+ ptr = new FlumeEventPointer(fileId, offset);
+ transactionMap.put(trans, ptr);
+ } else if (type == TransactionEventRecord.Type.TAKE.get()) {
+ Take take = (Take) record;
+ ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
+ transactionMap.put(trans, ptr);
+ } else if (type == TransactionEventRecord.Type.ROLLBACK.get()) {
+ transactionMap.remove(trans);
+ } else if (type == TransactionEventRecord.Type.COMMIT.get()) {
+ @SuppressWarnings("unchecked")
+ Collection<FlumeEventPointer> pointers =
+ (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+ if (pointers != null && pointers.size() > 0) {
+ processCommit(((Commit) record).getType(), pointers);
+ count += pointers.size();
+ }
+ } else {
+ Preconditions.checkArgument(false, "Unknown record type: "
+ + Integer.toHexString(type));
+ }
+
+ }
+ }
+ LOG.info("Replayed " + count + " from " + log);
+ } catch (EOFException e) {
+ LOG.warn("Hit EOF on " + log);
+ } finally {
+ total += count;
+ count = 0;
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ int pendingTakesSize = pendingTakes.size();
+ if (pendingTakesSize > 0) {
+ String msg = "Pending takes " + pendingTakesSize
+ + " exist after the end of replay";
+ if (LOG.isDebugEnabled()) {
+ for (Long pointer : pendingTakes) {
+ LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
+ }
+ Preconditions.checkState(false, msg);
+ } else {
+ LOG.error(msg + ". Duplicate messages will exist in destination.");
+ }
+ }
+ LOG.info("Replayed " + total);
+ }
+
+ private void processCommit(short type, Collection<FlumeEventPointer> pointers) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing commit of " + TransactionEventRecord.getName(type));
+ }
+ if (type == TransactionEventRecord.Type.PUT.get()) {
+ for (FlumeEventPointer pointer : pointers) {
+ Preconditions.checkState(queue.addTail(pointer), "Unable to add "
+ + pointer);
+ if (pendingTakes.remove(pointer.toLong())) {
+ Preconditions.checkState(queue.remove(pointer),
+ "Take was pending and pointer was successfully added to the"
+ + " queue but could not be removed: " + pointer);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Commited Put " + pointer);
+ }
+ }
+ } else if (type == TransactionEventRecord.Type.TAKE.get()) {
+ for (FlumeEventPointer pointer : pointers) {
+ boolean removed = queue.remove(pointer);
+ if (!removed) {
+ pendingTakes.add(pointer.toLong());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Unable to remove " + pointer + " added to pending list");
+ }
+ }
+ }
+ } else {
+ Preconditions.checkArgument(false,
+ "Unknown record type: " + Integer.toHexString(type));
+ }
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java Sat May 5 20:34:58 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Rollback on disk
+ */
+class Rollback extends TransactionEventRecord {
+ Rollback(Long transactionID) {
+ super(transactionID);
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ }
+ @Override
+ short getRecordType() {
+ return Type.ROLLBACK.get();
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java Sat May 5 20:34:58 2012
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a Take on disk
+ */
+class Take extends TransactionEventRecord {
+ private int offset;
+ private int fileID;
+ Take(Long transactionID) {
+ super(transactionID);
+ }
+ Take(Long transactionID, int offset, int fileID) {
+ this(transactionID);
+ this.offset = offset;
+ this.fileID = fileID;
+ }
+ int getOffset() {
+ return offset;
+ }
+
+ int getFileID() {
+ return fileID;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ offset = in.readInt();
+ fileID = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(offset);
+ out.writeInt(fileID);
+ }
+ @Override
+ short getRecordType() {
+ return Type.TAKE.get();
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java Sat May 5 20:34:58 2012
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class for records in data file: Put, Take, Rollback, Commit
+ */
+abstract class TransactionEventRecord implements Writable {
+ private final long transactionID;
+ private long timestamp;
+
+ protected TransactionEventRecord(long transactionID) {
+ this.transactionID = transactionID;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ timestamp = in.readLong();
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(timestamp);
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+ public long getTimestamp() {
+ return timestamp;
+ }
+ long getTransactionID() {
+ return transactionID;
+ }
+
+ abstract short getRecordType();
+
+
+ /**
+ * Provides a minimum guarantee we are not reading complete junk
+ */
+ static final int MAGIC_HEADER = 0xdeadbeef;
+
+ static enum Type {
+ PUT((short)1),
+ TAKE((short)2),
+ ROLLBACK((short)3),
+ COMMIT((short)4);
+
+ private short id;
+ Type(short id) {
+ this.id = id;
+ }
+ public short get() {
+ return id;
+ }
+ }
+ private static final ImmutableMap<Short, Constructor<? extends TransactionEventRecord>> TYPES;
+
+ static {
+ ImmutableMap.Builder<Short, Constructor<? extends TransactionEventRecord>> builder =
+ ImmutableMap.<Short, Constructor<? extends TransactionEventRecord>>builder();
+ try {
+ builder.put(Type.PUT.get(),
+ Put.class.getDeclaredConstructor(Long.class));
+ builder.put(Type.TAKE.get(),
+ Take.class.getDeclaredConstructor(Long.class));
+ builder.put(Type.ROLLBACK.get(),
+ Rollback.class.getDeclaredConstructor(Long.class));
+ builder.put(Type.COMMIT.get(),
+ Commit.class.getDeclaredConstructor(Long.class));
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ TYPES = builder.build();
+ }
+
+
+ static ByteBuffer toByteBuffer(TransactionEventRecord record) {
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
+ DataOutputStream dataOutput = new DataOutputStream(byteOutput);
+ try {
+ dataOutput.writeInt(MAGIC_HEADER);
+ dataOutput.writeShort(record.getRecordType());
+ dataOutput.writeLong(record.getTransactionID());
+ record.write(dataOutput);
+ dataOutput.flush();
+ // TODO toByteArray does an unneeded copy
+ return ByteBuffer.wrap(byteOutput.toByteArray());
+ } catch(IOException e) {
+ // near impossible
+ Throwables.propagate(e);
+ } finally {
+ if(dataOutput != null) {
+ try {
+ dataOutput.close();
+ } catch (IOException e) {}
+ }
+ }
+ throw new IllegalStateException(
+ "Should not occur as method should return or throw an exception");
+ }
+
+ static TransactionEventRecord fromDataInput(DataInput in) throws IOException {
+ int header = in.readInt();
+ if(header != MAGIC_HEADER) {
+ throw new IOException("Header " + Integer.toHexString(header) +
+ " not expected value: " + Integer.toHexString(MAGIC_HEADER));
+ }
+ short type = in.readShort();
+ long transactionID = in.readLong();
+ TransactionEventRecord entry = newRecordForType(type, transactionID);
+ entry.readFields(in);
+ return entry;
+ }
+
+ static String getName(short type) {
+ Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
+ Preconditions.checkNotNull(constructor, "Unknown action " +
+ Integer.toHexString(type));
+ return constructor.getDeclaringClass().getSimpleName();
+ }
+
+ private static TransactionEventRecord newRecordForType(short type, long transactionID) {
+ Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
+ Preconditions.checkNotNull(constructor, "Unknown action " +
+ Integer.toHexString(type));
+ try {
+ return constructor.newInstance(transactionID);
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ throw new IllegalStateException(
+ "Should not occur as method should return or throw an exception");
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java Sat May 5 20:34:58 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.List;
+
+import org.apache.flume.Sink;
+
+import com.google.common.collect.Lists;
+
+public class CountingSinkRunner extends Thread {
+ private int count;
+ private final int until;
+ private final Sink sink;
+ private volatile boolean run;
+ private final List<Exception> errors = Lists.newArrayList();
+ public CountingSinkRunner(Sink sink) {
+ this(sink, Integer.MAX_VALUE);
+ }
+ public CountingSinkRunner(Sink sink, int until) {
+ this.sink = sink;
+ this.until = until;
+ }
+ @Override
+ public void run() {
+ run = true;
+ while(run && count < until) {
+ boolean error = true;
+ try {
+ if(Sink.Status.READY.equals(sink.process())) {
+ count++;
+ error = false;
+ }
+ } catch(Exception ex) {
+ errors.add(ex);
+ }
+ if(error) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+ public void shutdown() {
+ run = false;
+ }
+ public int getCount() {
+ return count;
+ }
+ public List<Exception> getErrors() {
+ return errors;
+ }
+}
\ No newline at end of file
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java Sat May 5 20:34:58 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.PollableSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+
+import com.google.common.collect.Lists;
+
+public class CountingSourceRunner extends Thread {
+ private int count;
+ private final int until;
+ private final PollableSource source;
+ private volatile boolean run;
+ private final List<Exception> errors = Lists.newArrayList();
+ public CountingSourceRunner(PollableSource source) {
+ this(source, Integer.MAX_VALUE);
+ }
+ public CountingSourceRunner(PollableSource source, int until) {
+ this(source, until, null);
+ }
+ public CountingSourceRunner(PollableSource source, Channel channel) {
+ this(source, Integer.MAX_VALUE, channel);
+ }
+ public CountingSourceRunner(PollableSource source, int until, Channel channel) {
+ this.source = source;
+ this.until = until;
+ if(channel != null) {
+ ReplicatingChannelSelector selector = new ReplicatingChannelSelector();
+ List<Channel> channels = Lists.newArrayList();
+ channels.add(channel);
+ selector.setChannels(channels);
+ this.source.setChannelProcessor(new ChannelProcessor(selector));
+ }
+ }
+ @Override
+ public void run() {
+ run = true;
+ while(run && count < until) {
+ boolean error = true;
+ try {
+ if(PollableSource.Status.READY.equals(source.process())) {
+ count++;
+ error = false;
+ }
+ } catch(Exception ex) {
+ errors.add(ex);
+ }
+ if(error) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+ public void shutdown() {
+ run = false;
+ }
+ public int getCount() {
+ return count;
+ }
+ public List<Exception> getErrors() {
+ return errors;
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java Sat May 5 20:34:58 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCheckpoint {
+
+ File file;
+ @Before
+ public void setup() throws IOException {
+ file = File.createTempFile("Checkpoint", "");
+ Assert.assertTrue(file.isFile());
+ Assert.assertTrue(file.canWrite());
+ }
+ @After
+ public void cleanup() {
+ file.delete();
+ }
+ @Test
+ public void testSerialization() throws IOException {
+ FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
+ FlumeEventQueue queueIn = new FlumeEventQueue(1);
+ queueIn.addHead(ptrIn);
+ Checkpoint checkpoint = new Checkpoint(file, 1);
+ Assert.assertEquals(0, checkpoint.getTimestamp());
+ checkpoint.write(queueIn);
+ FlumeEventQueue queueOut = checkpoint.read();
+ FlumeEventPointer ptrOut = queueOut.removeHead();
+ Assert.assertEquals(ptrIn, ptrOut);
+ Assert.assertTrue(checkpoint.getTimestamp() > 0);
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1334488&r1=1334487&r2=1334488&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java Sat May 5 20:34:58 2012
@@ -21,191 +21,432 @@ package org.apache.flume.channel.file;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
-import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
public class TestFileChannel {
- private static final Logger logger = LoggerFactory
+ private static final Logger LOG = LoggerFactory
.getLogger(TestFileChannel.class);
private FileChannel channel;
+ private File checkpointDir;
+ private File[] dataDirs;
+ private String dataDir;
+ private final Context context = new Context();
@Before
- public void setUp() {
- channel = new FileChannel();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testNoDirectory() {
- Event event = EventBuilder.withBody("Test event".getBytes());
+ public void setup() {
+ checkpointDir = Files.createTempDir();
+ dataDirs = new File[3];
+ dataDir = "";
+ for (int i = 0; i < dataDirs.length; i++) {
+ dataDirs[i] = Files.createTempDir();
+ Assert.assertTrue(dataDirs[i].isDirectory());
+ dataDir += dataDirs[i].getAbsolutePath() + ",";
+ }
+ dataDir = dataDir.substring(0, dataDir.length() - 1);
+ channel = createFileChannel(1000);
- channel.put(event);
}
-
- @Test(expected = IllegalStateException.class)
- public void testNonExistantParent() {
- Event event = EventBuilder.withBody("Test event".getBytes());
-
- channel.setDirectory(new File("/i/do/not/exist"));
- channel.put(event);
+ private FileChannel createFileChannel() {
+ return createFileChannel(FileChannelConfiguration.DEFAULT_CAPACITY);
+ }
+ private FileChannel createFileChannel(int capacity) {
+ FileChannel channel = new FileChannel();
+ context.put(FileChannelConfiguration.CHECKPOINT_DIR,
+ checkpointDir.getAbsolutePath());
+ context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
+ context.put(FileChannelConfiguration.CAPACITY, String.valueOf(capacity));
+ Configurables.configure(channel, context);
+ channel.start();
+ return channel;
+ }
+ @After
+ public void teardown() {
+ if(channel != null) {
+ channel.stop();
+ }
+ FileUtils.deleteQuietly(checkpointDir);
+ for (int i = 0; i < dataDirs.length; i++) {
+ FileUtils.deleteQuietly(dataDirs[i]);
+ }
}
-
@Test
- public void testGetTransaction() throws IOException {
- File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
- FileUtils.forceDeleteOnExit(tmpDir);
-
- channel.setDirectory(tmpDir);
-
- Transaction tx1 = channel.getTransaction();
- Assert.assertNotNull(tx1);
-
- Transaction tx2 = channel.getTransaction();
- Assert.assertNotNull(tx2);
-
- Assert.assertEquals(tx1, tx2);
-
- tx2.begin();
- Assert.assertEquals(tx2, channel.getTransaction());
-
- tx2.rollback();
- Assert.assertEquals(tx2, channel.getTransaction());
-
- tx2.close();
- Assert.assertFalse(tx2.equals(channel.getTransaction()));
+ public void testRestart() throws Exception {
+ List<String> in = Lists.newArrayList();
+ try {
+ while(true) {
+ in.addAll(putEvents(channel, "restart", 1, 1));
+ }
+ } catch (ChannelException e) {
+ Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+ }
+ channel.stop();
+ channel = createFileChannel();
+ List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+ Collections.sort(in);
+ Collections.sort(out);
+ Assert.assertEquals(in, out);
}
-
- /**
- * <p>
- * Ensure two threads calling {@link FileChannel#getTransaction()} get
- * different transactions back.
- * </p>
- *
- */
@Test
- public void testConcurrentGetTransaction() throws IOException,
- InterruptedException, ExecutionException {
- File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
- FileUtils.forceDeleteOnExit(tmpDir);
- final CyclicBarrier latch = new CyclicBarrier(2);
-
- channel.setDirectory(tmpDir);
-
- Callable<FileBackedTransaction> getTransRunnable = new Callable<FileBackedTransaction>() {
-
- @Override
- public FileBackedTransaction call() {
- Transaction tx = null;
-
- try {
- /*
- * Wait for all threads to pile up to prevent thread reuse in the
- * pool.
- */
- latch.await();
- tx = channel.getTransaction();
- /*
- * This await isn't strictly necessary but it guarantees both threads
- * entered and exited getTransaction() in lock step which simplifies
- * debugging.
- */
- latch.await();
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting for threads", e);
- Assert.fail();
- } catch (BrokenBarrierException e) {
- logger.error("Barrier broken", e);
- Assert.fail();
- }
-
- return (FileBackedTransaction) tx;
+ public void testReconfigure() throws Exception {
+ List<String> in = Lists.newArrayList();
+ try {
+ while(true) {
+ in.addAll(putEvents(channel, "restart", 1, 1));
}
-
- };
-
- ExecutorService pool = Executors.newFixedThreadPool(2);
-
- Future<FileBackedTransaction> f1 = pool.submit(getTransRunnable);
- Future<FileBackedTransaction> f2 = pool.submit(getTransRunnable);
-
- FileBackedTransaction t1 = f1.get();
- FileBackedTransaction t2 = f2.get();
-
- Assert.assertNotSame("Transactions from different threads are different",
- t1, t2);
+ } catch (ChannelException e) {
+ Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+ }
+ Configurables.configure(channel, context);
+ List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+ Collections.sort(in);
+ Collections.sort(out);
+ Assert.assertEquals(in, out);
}
-
@Test
- public void testPut() throws IOException {
- File tmpDir = new File("/tmp/flume-fc-test-" + System.currentTimeMillis());
- FileUtils.forceDeleteOnExit(tmpDir);
-
- if (!tmpDir.mkdirs()) {
- throw new IOException("Unable to create test directory:" + tmpDir);
+ public void testPut() throws Exception {
+ // should find no items
+ int found = takeEvents(channel, 1, 5).size();
+ Assert.assertEquals(0, found);
+ List<String> expected = Lists.newArrayList();
+ expected.addAll(putEvents(channel, "unbatched", 1, 5));
+ expected.addAll(putEvents(channel, "batched", 5, 5));
+ List<String> actual = takeEvents(channel, 1);
+ Collections.sort(actual);
+ Collections.sort(expected);
+ Assert.assertEquals(expected, actual);
+ }
+ @Test
+ public void testRollbackAfterNoPutTake() throws Exception {
+ Transaction transaction;
+ transaction = channel.getTransaction();
+ transaction.begin();
+ transaction.rollback();
+ transaction.close();
+
+ // ensure we can reopen log with no error
+ channel.stop();
+ channel = createFileChannel();
+ transaction = channel.getTransaction();
+ transaction.begin();
+ Assert.assertNull(channel.take());
+ transaction.commit();
+ transaction.close();
+ }
+ @Test
+ public void testCommitAfterNoPutTake() throws Exception {
+ Transaction transaction;
+ transaction = channel.getTransaction();
+ transaction.begin();
+ transaction.commit();
+ transaction.close();
+
+ // ensure we can reopen log with no error
+ channel.stop();
+ channel = createFileChannel();
+ transaction = channel.getTransaction();
+ transaction.begin();
+ Assert.assertNull(channel.take());
+ transaction.commit();
+ transaction.close();
+ }
+ @Test
+ public void testCapacity() throws Exception {
+ channel.close();
+ channel = createFileChannel(5);
+ try {
+ putEvents(channel, "capacity", 1, 6);
+ } catch (ChannelException e) {
+ Assert.assertEquals("Cannot acquire capacity", e.getMessage());
}
+ // take an event, roll it back, and
+ // then make sure a put fails
+ Transaction transaction;
+ transaction = channel.getTransaction();
+ transaction.begin();
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ transaction.rollback();
+ transaction.close();
+ // ensure the take the didn't change the state of the capacity
+ try {
+ putEvents(channel, "capacity", 1, 1);
+ } catch (ChannelException e) {
+ Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+ }
+ // ensure we the events back
+ Assert.assertEquals(5, takeEvents(channel, 1, 5).size());
+ }
+ @Test
+ public void testRollbackSimulatedCrash() throws Exception {
+ int numEvents = 50;
+ List<String> in = putEvents(channel, "rollback", 1, numEvents);
+
+ Transaction transaction;
+ // put an item we will rollback
+ transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8)));
+ transaction.rollback();
+ transaction.close();
+
+ // simulate crash
+ channel.stop();
+ channel = createFileChannel();
+
+ // we should not get the rolled back item
+ List<String> out = takeEvents(channel, 1, numEvents);
+ Collections.sort(in);
+ Collections.sort(out);
+ Assert.assertEquals(in, out);
+ }
+ @Test
+ public void testRollbackSimulatedCrashWithSink() throws Exception {
+ int numEvents = 100;
- channel.setDirectory(tmpDir);
-
- /* Issue five one record transactions. */
- for (int i = 0; i < 5; i++) {
- Transaction transaction = channel.getTransaction();
+ LoggerSink sink = new LoggerSink();
+ sink.setChannel(channel);
+ // sink will leave one item
+ CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1);
+ runner.start();
+ putEvents(channel, "rollback", 10, numEvents);
+
+ Transaction transaction;
+ // put an item we will rollback
+ transaction = channel.getTransaction();
+ transaction.begin();
+ byte[] bytes = "rolled back".getBytes(Charsets.UTF_8);
+ channel.put(EventBuilder.withBody(bytes));
+ transaction.rollback();
+ transaction.close();
- Assert.assertNotNull(transaction);
+ while(runner.isAlive()) {
+ Thread.sleep(10L);
+ }
+ Assert.assertEquals(numEvents - 1, runner.getCount());
+ for(Exception ex : runner.getErrors()) {
+ LOG.warn("Sink had error", ex);
+ }
+ Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());
- try {
+ // simulate crash
+ channel.stop();
+ channel = createFileChannel();
+
+ List<String> out = takeEvents(channel, 1, 1);
+ Assert.assertEquals(1, out.size());
+ Assert.assertEquals("rollback-90-9", out.get(0));
+ }
+ @Test
+ public void testThreaded() throws IOException, InterruptedException {
+ int numThreads = 10;
+ final CountDownLatch startLatch = new CountDownLatch(numThreads * 2);
+ final CountDownLatch stopLatch = new CountDownLatch(numThreads * 2);
+ final List<Exception> errors = Collections
+ .synchronizedList(new ArrayList<Exception>());
+ final List<String> expected = Collections
+ .synchronizedList(new ArrayList<String>());
+ final List<String> actual = Collections
+ .synchronizedList(new ArrayList<String>());
+ for (int i = 0; i < numThreads; i++) {
+ final int id = i;
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startLatch.countDown();
+ startLatch.await();
+ if (id % 2 == 0) {
+ expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
+ } else {
+ expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ stopLatch.countDown();
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ }
+ for (int i = 0; i < numThreads; i++) {
+ final int id = i;
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startLatch.countDown();
+ startLatch.await();
+ Thread.sleep(100L); // ensure puts have started
+ if (id % 2 == 0) {
+ actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+ } else {
+ actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ stopLatch.countDown();
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ }
+ Assert.assertTrue(stopLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertEquals(Collections.EMPTY_LIST, errors);
+ Collections.sort(expected);
+ Collections.sort(actual);
+ Assert.assertEquals(expected, actual);
+ }
+ @Test(expected=IOException.class)
+ public void testLocking() throws IOException {
+ try {
+ createFileChannel();
+ } catch (RuntimeException e) {
+ Throwable cause = e.getCause();
+ Assert.assertNotNull(cause);
+ Assert.assertTrue(cause.getClass().getSimpleName(),
+ cause instanceof IOException);
+ String msg = cause.getMessage();
+ Assert.assertNotNull(msg);
+ Assert.assertTrue(msg.endsWith("The directory is already locked."));
+ throw (IOException)cause;
+ }
+ }
+ @Test
+ public void testIntegration() throws IOException, InterruptedException {
+ // set shorter checkpoint and filesize to ensure
+ // checkpoints and rolls occur during the test
+ context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+ String.valueOf(10L * 1000L));
+ context.put(FileChannelConfiguration.MAX_FILE_SIZE,
+ String.valueOf(1024 * 1024 * 5));
+ // do reconfiguration
+ Configurables.configure(channel, context);
+
+ SequenceGeneratorSource source = new SequenceGeneratorSource();
+ CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel);
+
+ NullSink sink = new NullSink();
+ sink.setChannel(channel);
+ CountingSinkRunner sinkRunner = new CountingSinkRunner(sink);
+
+ sinkRunner.start();
+ sourceRunner.start();
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES));
+ // shutdown source
+ sourceRunner.shutdown();
+ while(sourceRunner.isAlive()) {
+ Thread.sleep(10L);
+ }
+ // wait for queue to clear
+ while(channel.getDepth() > 0) {
+ Thread.sleep(10L);
+ }
+ // shutdown size
+ sinkRunner.shutdown();
+ // wait a few seconds
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS));
+ List<File> logs = Lists.newArrayList();
+ for (int i = 0; i < dataDirs.length; i++) {
+ logs.addAll(LogUtils.getLogs(dataDirs[i]));
+ }
+ LOG.info("Total Number of Logs = " + logs.size());
+ for(File logFile : logs) {
+ LOG.info("LogFile = " + logFile);
+ }
+ LOG.info("Source processed " + sinkRunner.getCount());
+ LOG.info("Sink processed " + sourceRunner.getCount());
+ for(Exception ex : sourceRunner.getErrors()) {
+ LOG.warn("Source had error", ex);
+ }
+ for(Exception ex : sinkRunner.getErrors()) {
+ LOG.warn("Sink had error", ex);
+ }
+ Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());
+ Assert.assertEquals(Collections.EMPTY_LIST, sinkRunner.getErrors());
+ Assert.assertEquals(Collections.EMPTY_LIST, sourceRunner.getErrors());
+ }
+ private static List<String> takeEvents(Channel channel,
+ int batchSize) throws Exception {
+ return takeEvents(channel, batchSize, Integer.MAX_VALUE);
+ }
+ private static List<String> takeEvents(Channel channel,
+ int batchSize, int numEvents) throws Exception {
+ List<String> result = Lists.newArrayList();
+ for (int i = 0; i < numEvents; i += batchSize) {
+ for (int j = 0; j < batchSize; j++) {
+ Transaction transaction = channel.getTransaction();
transaction.begin();
-
- Event event = EventBuilder.withBody(("Test event" + i).getBytes());
- channel.put(event);
-
- transaction.commit();
- } catch (Exception e) {
- logger.error(
- "Failed to put event into file channel. Exception follows.", e);
- transaction.rollback();
- Assert.fail();
- } finally {
- transaction.close();
+ try {
+ Event event = channel.take();
+ if(event == null) {
+ transaction.commit();
+ return result;
+ }
+ result.add(new String(event.getBody(), Charsets.UTF_8));
+ transaction.commit();
+ } catch (Exception ex) {
+ transaction.rollback();
+ throw ex;
+ } finally {
+ transaction.close();
+ }
}
}
-
- /* Issue one five record transaction. */
- Transaction transaction = channel.getTransaction();
-
- try {
- transaction.begin();
-
- for (int i = 0; i < 5; i++) {
- Event event = EventBuilder.withBody(("Test event" + i).getBytes());
- channel.put(event);
+ return result;
+ }
+ private static List<String> putEvents(Channel channel, String prefix,
+ int batchSize, int numEvents) throws Exception {
+ List<String> result = Lists.newArrayList();
+ for (int i = 0; i < numEvents; i += batchSize) {
+ for (int j = 0; j < batchSize; j++) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ try {
+ String s = prefix + "-" + i +"-" + j;
+ Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
+ result.add(s);
+ channel.put(event);
+ transaction.commit();
+ } catch (Exception ex) {
+ transaction.rollback();
+ throw ex;
+ } finally {
+ transaction.close();
+ }
}
-
- transaction.commit();
- } catch (Exception e) {
- logger.error("Failed to put event into file channel. Exception follows.",
- e);
- transaction.rollback();
- Assert.fail();
- } finally {
- transaction.close();
}
+ return result;
}
-
-}
+}
\ No newline at end of file
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java Sat May 5 20:34:58 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+
+public class TestFlumeEvent {
+
+ @Test
+ public void testBasics() {
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put("key", "value");
+ byte[] body = "flume".getBytes(Charsets.UTF_8);
+ FlumeEvent event = new FlumeEvent(headers, body);
+ Assert.assertEquals(headers, event.getHeaders());
+ Assert.assertTrue(Arrays.equals(body, event.getBody()));
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put("key", "value");
+ byte[] body = "flume".getBytes(Charsets.UTF_8);
+ FlumeEvent in = new FlumeEvent(headers, body);
+ FlumeEvent out = FlumeEvent.from(TestUtils.toDataInput(in));
+ Assert.assertEquals(headers, out.getHeaders());
+ Assert.assertTrue(Arrays.equals(body, out.getBody()));
+ in.setHeaders(null);
+ in.setBody(null);
+ out = FlumeEvent.from(TestUtils.toDataInput(in));
+ Assert.assertEquals(Maps.newHashMap(), out.getHeaders());
+ Assert.assertNull(out.getBody());
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java Sat May 5 20:34:58 2012
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestFlumeEventPointer {
+
+ @Test
+ public void testGetter() {
+ FlumeEventPointer pointer = new FlumeEventPointer(1, 1);
+ Assert.assertEquals(1, pointer.getFileID());
+ Assert.assertEquals(1, pointer.getOffset());
+ }
+ @Test
+ public void testEquals() {
+ FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+ FlumeEventPointer pointerB = new FlumeEventPointer(1, 1);
+ Assert.assertEquals(pointerA, pointerB);
+ Assert.assertEquals(pointerB, pointerA);
+ pointerA = new FlumeEventPointer(1, 1);
+ pointerB = new FlumeEventPointer(2, 2);
+ Assert.assertFalse(pointerA.equals(pointerB));
+ Assert.assertFalse(pointerB.equals(pointerA));
+ }
+ @Test
+ public void testHashCode() {
+ FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+ FlumeEventPointer pointerB = new FlumeEventPointer(1, 1);
+ Assert.assertEquals(pointerA.hashCode(), pointerB.hashCode());
+ pointerA = new FlumeEventPointer(1, 1);
+ pointerB = new FlumeEventPointer(2, 2);
+ Assert.assertFalse(pointerA.hashCode() == pointerB.hashCode());
+ }
+
+ @Test
+ public void testPack() {
+ FlumeEventPointer pointerA = new FlumeEventPointer(1, 1);
+ FlumeEventPointer pointerB = new FlumeEventPointer(1, 2);
+ Assert.assertEquals(4294967297L, pointerA.toLong());
+ Assert.assertEquals(4294967298L, pointerB.toLong());
+ Assert.assertEquals(pointerA, FlumeEventPointer.fromLong(pointerA.toLong()));
+ Assert.assertEquals(pointerB, FlumeEventPointer.fromLong(pointerB.toLong()));
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventPointer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java?rev=1334488&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java Sat May 5 20:34:58 2012
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestFlumeEventQueue {
+
+ FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
+ FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
+ FlumeEventQueue queue;
+ @Before
+ public void setup() {
+ queue = new FlumeEventQueue(1000);
+ }
+ @Test
+ public void testQueueIsEmptyAfterCreation() {
+ Assert.assertNull(queue.removeHead());
+ }
+ @Test
+ public void testCapacity() {
+ queue = new FlumeEventQueue(1);
+ Assert.assertTrue(queue.addTail(pointer1));
+ Assert.assertFalse(queue.addTail(pointer2));
+ }
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidCapacityZero() {
+ queue = new FlumeEventQueue(0);
+ }
+ @Test(expected=IllegalArgumentException.class)
+ public void testInvalidCapacityNegative() {
+ queue = new FlumeEventQueue(-1);
+ }
+ @Test
+ public void addTail1() {
+ Assert.assertTrue(queue.addTail(pointer1));
+ Assert.assertEquals(pointer1, queue.removeHead());
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ }
+ @Test
+ public void addTail2() {
+ Assert.assertTrue(queue.addTail(pointer1));
+ Assert.assertTrue(queue.addTail(pointer2));
+ Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
+ Assert.assertEquals(pointer1, queue.removeHead());
+ Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
+ }
+ @Test
+ public void addTailLarge() {
+ int size = 500;
+ Set<Integer> fileIDs = Sets.newHashSet();
+ for (int i = 1; i <= size; i++) {
+ Assert.assertTrue(queue.addTail(new FlumeEventPointer(i, i)));
+ fileIDs.add(i);
+ Assert.assertEquals(fileIDs, queue.getFileIDs());
+ }
+ for (int i = 1; i <= size; i++) {
+ Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+ fileIDs.remove(i);
+ Assert.assertEquals(fileIDs, queue.getFileIDs());
+ }
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ }
+ @Test
+ public void addHead1() {
+ Assert.assertTrue(queue.addHead(pointer1));
+ Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+ Assert.assertEquals(pointer1, queue.removeHead());
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ }
+ @Test
+ public void addHead2() {
+ Assert.assertTrue(queue.addHead(pointer1));
+ Assert.assertTrue(queue.addHead(pointer2));
+ Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
+ Assert.assertEquals(pointer2, queue.removeHead());
+ Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+ }
+ @Test
+ public void addHeadLarge() {
+ int size = 500;
+ Set<Integer> fileIDs = Sets.newHashSet();
+ for (int i = 1; i <= size; i++) {
+ Assert.assertTrue(queue.addHead(new FlumeEventPointer(i, i)));
+ fileIDs.add(i);
+ Assert.assertEquals(fileIDs, queue.getFileIDs());
+ }
+ for (int i = size; i > 0; i--) {
+ Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+ fileIDs.remove(i);
+ Assert.assertEquals(fileIDs, queue.getFileIDs());
+ }
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ }
+ @Test
+ public void addTailRemove1() {
+ Assert.assertTrue(queue.addTail(pointer1));
+ Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
+ Assert.assertTrue(queue.remove(pointer1));
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ Assert.assertNull(queue.removeHead());
+ Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
+ }
+
+ @Test
+ public void addTailRemove2() {
+ Assert.assertTrue(queue.addTail(pointer1));
+ Assert.assertTrue(queue.addTail(pointer2));
+ Assert.assertTrue(queue.remove(pointer1));
+ Assert.assertEquals(pointer2, queue.removeHead());
+ }
+
+ @Test
+ public void addHeadRemove1() {
+ queue.addHead(pointer1);
+ Assert.assertTrue(queue.remove(pointer1));
+ Assert.assertNull(queue.removeHead());
+ }
+ @Test
+ public void addHeadRemove2() {
+ Assert.assertTrue(queue.addHead(pointer1));
+ Assert.assertTrue(queue.addHead(pointer2));
+ Assert.assertTrue(queue.remove(pointer1));
+ Assert.assertEquals(pointer2, queue.removeHead());
+ }
+ @Test
+ public void testWrappingCorrectly() {
+ int size = Integer.MAX_VALUE;
+ for (int i = 1; i <= size; i++) {
+ if(!queue.addHead(new FlumeEventPointer(i, i))) {
+ break;
+ }
+ }
+ for (int i = queue.size()/2; i > 0; i--) {
+ Assert.assertNotNull(queue.removeHead());
+ }
+ // addHead below would throw an IndexOOBounds with
+ // bad version of FlumeEventQueue.convert
+ for (int i = 1; i <= size; i++) {
+ if(!queue.addHead(new FlumeEventPointer(i, i))) {
+ break;
+ }
+ }
+ }
+}
Propchange: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
------------------------------------------------------------------------------
svn:eol-style = native