You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/24 09:38:05 UTC
[1/2] FLUME-2094. Remove the deprecated Recoverable Memory Channel.
Updated Branches:
refs/heads/trunk c059c916e -> 08843202f
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a7aeef4..816a5ed 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1972,31 +1972,6 @@ Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
-Recoverable Memory Channel
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-
-.. warning:: The Recoverable Memory Channel has been deprecated
- in favor of the FileChannel. FileChannel is durable channel
- and performs better than the Recoverable Memory Channel.
-
-Required properties are in **bold**.
-
-====================== =============================================== =========================================================================
-Property Name Default Description
-====================== =============================================== =========================================================================
-**type** -- The component type name, needs to be
- ``org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel``
-wal.dataDir ${user.home}/.flume/recoverable-memory-channel
-wal.rollSize (0x04000000) Max size (in bytes) of a single file before we roll
-wal.minRetentionPeriod 300000 Min amount of time (in millis) to keep a log
-wal.workerInterval 60000 How often (in millis) the background worker checks for old logs
-wal.maxLogsSize (0x20000000) Total amt (in bytes) of logs to keep, excluding the current log
-capacity 100
-transactionCapacity 100
-keep-alive 3
-====================== =============================================== =========================================================================
-
File Channel
~~~~~~~~~~~~
@@ -3070,7 +3045,6 @@ Component Interface Type Alias
============================================================ ====================== ====================================================================
org.apache.flume.Channel memory org.apache.flume.channel.MemoryChannel
org.apache.flume.Channel jdbc org.apache.flume.channel.jdbc.JdbcChannel
-org.apache.flume.Channel recoverablememory org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
org.apache.flume.Channel file org.apache.flume.channel.file.FileChannel
org.apache.flume.Channel -- org.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channel -- org.example.MyChannel
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6846b07..7f6f824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -925,12 +925,6 @@ limitations under the License.
<version>1.4.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-channels</groupId>
- <artifactId>flume-recoverable-memory-channel</artifactId>
- <version>1.4.0-SNAPSHOT</version>
- </dependency>
-
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
[2/2] git commit: FLUME-2094. Remove the deprecated Recoverable
Memory Channel.
Posted by mp...@apache.org.
FLUME-2094. Remove the deprecated Recoverable Memory Channel.
(Roshan Naik via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/08843202
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/08843202
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/08843202
Branch: refs/heads/trunk
Commit: 08843202f78e0539e7ebb48c1d1a3fe7986db899
Parents: c059c91
Author: Mike Percy <mp...@apache.org>
Authored: Mon Jun 24 00:37:06 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jun 24 00:37:06 2013 -0700
----------------------------------------------------------------------
.../flume-recoverable-memory-channel/pom.xml | 108 ----
.../memory/RecoverableMemoryChannel.java | 321 -----------
.../memory/RecoverableMemoryChannelEvent.java | 113 ----
.../memory/wal/SequenceIDBuffer.java | 126 -----
.../channel/recoverable/memory/wal/WAL.java | 550 -------------------
.../recoverable/memory/wal/WALDataFile.java | 154 ------
.../recoverable/memory/wal/WALEntry.java | 84 ---
.../recoverable/memory/wal/WALReplayResult.java | 41 --
.../memory/TestRecoverableMemoryChannel.java | 336 -----------
.../memory/wal/TestSequenceIDBuffer.java | 73 ---
.../channel/recoverable/memory/wal/TestWAL.java | 287 ----------
.../src/test/resources/log4j.properties | 25 -
flume-ng-channels/pom.xml | 1 -
.../conf/channel/ChannelConfiguration.java | 10 +-
.../apache/flume/conf/channel/ChannelType.java | 9 +-
flume-ng-dist/pom.xml | 4 -
flume-ng-doc/sphinx/FlumeUserGuide.rst | 26 -
pom.xml | 6 -
18 files changed, 2 insertions(+), 2272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/pom.xml b/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
deleted file mode 100644
index 80f8d21..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
+++ /dev/null
@@ -1,108 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>flume-ng-channels</artifactId>
- <groupId>org.apache.flume</groupId>
- <version>1.4.0-SNAPSHOT</version>
- </parent>
-
- <groupId>org.apache.flume.flume-ng-channels</groupId>
- <artifactId>flume-recoverable-memory-channel</artifactId>
- <name>Flume NG file backed Memory channel</name>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-configuration</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>${hadoop.common.artifact.id}</artifactId>
- <optional>true</optional>
- </dependency>
-
- </dependencies>
-
- <profiles>
-
- <profile>
- <id>hadoop-2</id>
- <activation>
- <property>
- <name>hadoop.profile</name>
- <value>2</value>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>
- </profile>
-
- </profiles>
-
-
-
-</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
deleted file mode 100644
index c7a9947..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.annotations.Disposable;
-import org.apache.flume.channel.BasicChannelSemantics;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.recoverable.memory.wal.WAL;
-import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
-import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * <p>
- * A durable {@link Channel} implementation that uses the local file system for
- * its storage.
- * </p>
- *
- * @deprecated The RecoverableMemoryChannel has been deprecated in favor of
- * {@link org.apache.flume.channel.file.FileChannel}, which gives better
- * performance and is also durable.
- */
-@Disposable
-@Deprecated
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class RecoverableMemoryChannel extends BasicChannelSemantics {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(RecoverableMemoryChannel.class);
-
-
- public static final String WAL_DATA_DIR = "wal.dataDir";
- public static final String WAL_ROLL_SIZE = "wal.rollSize";
- public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
- public static final String WAL_MIN_RETENTION_PERIOD = "wal.minRetentionPeriod";
- public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
- public static final String CAPACITY = "capacity";
- public static final String KEEPALIVE = "keep-alive";
-
- public static final int DEFAULT_CAPACITY = 100;
- public static final int DEFAULT_KEEPALIVE = 3;
-
- private MemoryChannel memoryChannel = new MemoryChannel();
- private AtomicLong seqidGenerator = new AtomicLong(0);
- private WAL<RecoverableMemoryChannelEvent> wal;
- /**
- * MemoryChannel checks to ensure the capacity is available
- * on commit. That is a problem because we need to write to
- * disk before we commit the data to MemoryChannel. As such
- * we keep track of capacity ourselves.
- */
- private Semaphore queueRemaining;
- private int capacity;
- private int keepAlive;
- private volatile boolean open;
-
- public RecoverableMemoryChannel() {
- open = false;
- }
-
- @Override
- public void configure(Context context) {
- memoryChannel.configure(context);
- int capacity = context.getInteger(CAPACITY, DEFAULT_CAPACITY);
- if(queueRemaining == null) {
- queueRemaining = new Semaphore(capacity, true);
- } else if(capacity > this.capacity) {
- // capacity increase
- queueRemaining.release(capacity - this.capacity);
- } else if(capacity < this.capacity) {
- queueRemaining.acquireUninterruptibly(this.capacity - capacity);
- }
- this.capacity = capacity;
- keepAlive = context.getInteger(KEEPALIVE, DEFAULT_KEEPALIVE);
- long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
- long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
- long minLogRetentionPeriod = context.getLong(WAL_MIN_RETENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RETENTION_PERIOD);
- long workerInterval = context.getLong(WAL_WORKER_INTERVAL, WAL.DEFAULT_WORKER_INTERVAL);
- if(wal == null) {
- String homePath = System.getProperty("user.home").replace('\\', '/');
- String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
- try {
- wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
- RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
- minLogRetentionPeriod, workerInterval);
- } catch (IOException e) {
- Throwables.propagate(e);
- }
- } else {
- wal.setRollSize(rollSize);
- wal.setMaxLogsSize(maxLogsSize);
- wal.setMinLogRetentionPeriod(minLogRetentionPeriod);
- wal.setWorkerInterval(workerInterval);
- LOG.warn(this.getClass().getSimpleName() + " only supports " +
- "partial reconfiguration.");
- }
- }
-
- @Override
- public synchronized void start() {
- LOG.info("Starting " + this);
- try {
- WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
- Preconditions.checkArgument(results.getSequenceID() >= 0);
- LOG.info("Replay SequenceID " + results.getSequenceID());
- seqidGenerator.set(results.getSequenceID());
- int numResults = results.getResults().size();
- Preconditions.checkState(numResults <= capacity, "Capacity " + capacity +
- ", but we need to replay " + numResults);
- LOG.info("Replay Events " + numResults);
- for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
- seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
- }
- for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
- Transaction transaction = null;
- try {
- transaction = memoryChannel.getTransaction();
- transaction.begin();
- memoryChannel.put(entry.getData());
- transaction.commit();
- } catch(Exception e) {
- if(transaction != null) {
- try {
- transaction.rollback();
- } catch(Exception ex) {
- LOG.info("Error during rollback", ex);
- }
- }
- Throwables.propagate(e);
- } catch(Error e) {
- if(transaction != null) {
- try {
- transaction.rollback();
- } catch(Exception ex) {
- LOG.info("Error during rollback", ex);
- }
- }
- throw e;
- } finally {
- if(transaction != null) {
- transaction.close();
- }
- }
- }
- } catch (IOException e) {
- Throwables.propagate(e);
- }
- super.start();
- open = true;
- }
-
- @Override
- public synchronized void stop() {
- open = false;
- LOG.info("Stopping " + this);
- try {
- close();
- } catch (IOException e) {
- Throwables.propagate(e);
- }
- super.stop();
- }
-
- @Override
- protected BasicTransactionSemantics createTransaction() {
- return new RecoverableMemoryTransaction(this, memoryChannel);
- }
-
- private void commitEvents(List<RecoverableMemoryChannelEvent> events)
- throws IOException {
- List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
- for(RecoverableMemoryChannelEvent event : events) {
- entries.add(new WALEntry<RecoverableMemoryChannelEvent>(event, event.sequenceId));
- }
- wal.writeEntries(entries);
- }
- private void commitSequenceID(List<Long> seqids)
- throws IOException {
- wal.writeSequenceIDs(seqids);
- }
-
- private long nextSequenceID() {
- return seqidGenerator.incrementAndGet();
- }
-
- void close() throws IOException {
- if(wal != null) {
- wal.close();
- }
- }
-
- /**
- * <p>
- * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
- * </p>
- */
- private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
-
- private Transaction transaction;
- private MemoryChannel memoryChannel;
- private RecoverableMemoryChannel channel;
- private List<Long> sequenceIds = Lists.newArrayList();
- private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
- private int takes;
-
- private RecoverableMemoryTransaction(RecoverableMemoryChannel channel,
- MemoryChannel memoryChannel) {
- this.channel = channel;
- this.memoryChannel = memoryChannel;
- this.transaction = this.memoryChannel.getTransaction();
- this.takes = 0;
- }
- @Override
- protected void doBegin() throws InterruptedException {
- transaction.begin();
- }
- @Override
- protected void doPut(Event event) throws InterruptedException {
- if(!channel.open) {
- throw new ChannelException("Channel not open");
- }
- if(!channel.queueRemaining.tryAcquire(channel.keepAlive, TimeUnit.SECONDS)) {
- throw new ChannelException("Cannot acquire capacity");
- }
- RecoverableMemoryChannelEvent sequencedEvent =
- new RecoverableMemoryChannelEvent(event, channel.nextSequenceID());
- memoryChannel.put(sequencedEvent);
- events.add(sequencedEvent);
- }
-
- @Override
- protected Event doTake() throws InterruptedException {
- if(!channel.open) {
- throw new ChannelException("Channel not open");
- }
- RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
- if(event != null) {
- sequenceIds.add(event.sequenceId);
- takes++;
- return event.event;
- }
- return null;
- }
-
- @Override
- protected void doCommit() throws InterruptedException {
- if(!channel.open) {
- throw new ChannelException("Channel not open");
- }
- if(sequenceIds.size() > 0) {
- try {
- channel.commitSequenceID(sequenceIds);
- } catch (IOException e) {
- throw new ChannelException("Unable to commit", e);
- }
- }
- if(!events.isEmpty()) {
- try {
- channel.commitEvents(events);
- } catch (IOException e) {
- throw new ChannelException("Unable to commit", e);
- }
- }
- transaction.commit();
- channel.queueRemaining.release(takes);
- }
-
- @Override
- protected void doRollback() throws InterruptedException {
- sequenceIds.clear();
- events.clear();
- channel.queueRemaining.release(events.size());
- transaction.rollback();
- }
-
- @Override
- protected void doClose() {
- sequenceIds.clear();
- events.clear();
- transaction.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
deleted file mode 100644
index 1fd0d33..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-class RecoverableMemoryChannelEvent implements Event, Writable {
- Event event;
- long sequenceId;
-
- // called via reflection
- @SuppressWarnings("unused")
- private RecoverableMemoryChannelEvent() {
- this.event = new SimpleEvent();
- }
-
- RecoverableMemoryChannelEvent(Event event, long sequenceId) {
- this.event = event;
- this.sequenceId = sequenceId;
- }
- @Override
- public Map<String, String> getHeaders() {
- return event.getHeaders();
- }
- @Override
- public void setHeaders(Map<String, String> headers) {
- event.setHeaders(headers);
- }
- @Override
- public byte[] getBody() {
- return event.getBody();
- }
- @Override
- public void setBody(byte[] body) {
- event.setBody(body);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(sequenceId);
- MapWritable map = toMapWritable(getHeaders());
- map.write(out);
- byte[] body = getBody();
- if(body == null) {
- out.writeInt(-1);
- } else {
- out.writeInt(body.length);
- out.write(body);
- }
- }
-
-
-
- @Override
- public void readFields(DataInput in) throws IOException {
- sequenceId = in.readLong();
- MapWritable map = new MapWritable();
- map.readFields(in);
- setHeaders(fromMapWritable(map));
- byte[] body = null;
- int bodyLength = in.readInt();
- if(bodyLength != -1) {
- body = new byte[bodyLength];
- in.readFully(body);
- }
- setBody(body);
- }
- private MapWritable toMapWritable(Map<String, String> map) {
- MapWritable result = new MapWritable();
- if(map != null) {
- for(Map.Entry<String, String> entry : map.entrySet()) {
- result.put(new Text(entry.getKey()),new Text(entry.getValue()));
- }
- }
- return result;
- }
- private Map<String, String> fromMapWritable(MapWritable map) {
- Map<String, String> result = Maps.newHashMap();
- if(map != null) {
- for(Map.Entry<Writable, Writable> entry : map.entrySet()) {
- result.put(entry.getKey().toString(),entry.getValue().toString());
- }
- }
- return result;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
deleted file mode 100644
index 02b0f81..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.nio.ByteBuffer;
-import java.nio.LongBuffer;
-
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.QuickSort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Throwables;
-
-public class SequenceIDBuffer implements IndexedSortable {
- private static final Logger LOG =
- LoggerFactory.getLogger(SequenceIDBuffer.class);
- private static final int SIZE_OF_LONG = 8;
- private LongBuffer buffer;
- private ByteBuffer backingBuffer;
-
- public SequenceIDBuffer(int size) {
- int bytesRequired = size * SIZE_OF_LONG;
- backingBuffer = DirectMemoryUtils.allocate(bytesRequired);
- buffer = backingBuffer.asLongBuffer();
- }
-
- @Override
- public int compare(int leftIndex, int rightIndex) {
- long left = get(leftIndex);
- long right = get(rightIndex);
- return (left < right ? -1 : (left == right ? 0 : 1));
-
- }
-
- public boolean exists(long value) {
- return binarySearch(value) >= 0;
- }
-
- private int binarySearch(long value) {
- int low = 0;
- int high = size() - 1;
-
- while (low <= high) {
- int mid = (low + high) >>> 1;
- long midVal = get(mid);
-
- if (midVal < value) {
- low = mid + 1;
- } else if (midVal > value) {
- high = mid - 1;
- } else {
- return mid; // key found
- }
- }
- return -(low + 1); // key not found.
- }
-
- @Override
- public void swap(int leftIndex, int rightIndex) {
- long left = get(leftIndex);
- long right = get(rightIndex);
- put(leftIndex, right);
- put(rightIndex, left);
- }
-
- public long get(int index) {
- return buffer.get(index);
- }
-
- public void put(int index, long value) {
- buffer.put(index, value);
- }
-
- public int size() {
- return buffer.limit();
- }
-
- public void close() {
- try {
- DirectMemoryUtils.clean(backingBuffer);
- } catch (Exception e) {
- LOG.warn("Error cleaning up buffer", e);
- if (LOG.isDebugEnabled()) {
- Throwables.propagate(e);
- }
- }
- }
-
- public void sort() {
- QuickSort quickSort = new QuickSort();
- quickSort.sort(this, 0, size());
- }
-
- public static void main(String[] args) throws Exception {
- try {
- System.out.println("SequenceIDBuffer");
- SequenceIDBuffer buffer = new SequenceIDBuffer(13107200);
- buffer.close();
- System.out.println("Array");
- @SuppressWarnings("unused")
- long[] array = new long[13107200];
- } catch (Throwable t) {
- t.printStackTrace();
- } finally {
- Thread.sleep(Long.MAX_VALUE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
deleted file mode 100644
index 223fba5..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-/**
- * Provides Write Ahead Log functionality for a generic Writable. All entries
- * stored in the WAL must be assigned a unique increasing sequence id. WAL
- * files will be removed when the following condition holds (defaults):
- *
- * At least 512MB of WAL's exist, the file in question is greater than
- * five minutes old and the largest committed sequence id is greater
- * than the largest sequence id in the file.
- *
- * <pre>
- * WAL wal = new WAL(path, Writable.class);
- * wal.writeEvent(event, 1);
- * wal.writeEvent(event, 2);
- * wal.writeSequenceID(1);
- * wal.writeEvent(event, 3);
- *
- * System crashes or shuts down...
- *
- * WAL wal = new WAL(path, Writable.class);
- * [Event 2, Event 3] = wal.replay();
- * </pre>
- *
- * WAL files will be created in the specified data directory. They will be
- * rolled at 64MB and deleted five minutes after they are no longer needed.
- * that is the current sequence id) is greater than the greatest sequence id
- * in the file.
- *
- * The only synchronization this class does is around rolling log files. When
- * a roll of the log file is required, the thread which discovers this
- * will execute the roll. Any threads calling a write*() method during
- * the roll will block until the roll is complete.
- */
-public class WAL<T extends Writable> implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(WAL.class);
-
- private File path;
- private File dataPath;
- private File sequenceIDPath;
- private Class<T> clazz;
- private WALDataFile.Writer<T> dataFileWALWriter;
- private WALDataFile.Writer<NullWritable> sequenceIDWALWriter;
- private Map<String, Long> fileLargestSequenceIDMap = Collections
- .synchronizedMap(new HashMap<String, Long>());
- private AtomicLong largestCommitedSequenceID = new AtomicLong(0);
- private volatile boolean rollRequired;
- private volatile boolean rollInProgress;
- private volatile long rollSize;
- private volatile long maxLogsSize;
- private volatile long minLogRetentionPeriod;
- private volatile long workerInterval;
- private int numReplaySequenceIDOverride;
- private Worker backgroundWorker;
-
- /**
- * Number of bytes before we roll the file.
- */
- public static final long DEFAULT_ROLL_SIZE = 1024L * 1024L * 64L;
- /**
- * Number of bytes, to keep before we start pruning logs.
- */
- public static final long DEFAULT_MAX_LOGS_SIZE = 1024L * 1024L * 512L;
- /**
- * Minimum number of ms to keep a log file.
- */
- public static final long DEFAULT_MIN_LOG_RETENTION_PERIOD = 5L * 60L * 1000L;
- /**
- * How often in ms the background worker runs
- */
- public static final long DEFAULT_WORKER_INTERVAL = 60L * 1000L;
-
- // used for testing only
- WAL(File path, Class<T> clazz) throws IOException {
- this(path, clazz, DEFAULT_ROLL_SIZE, DEFAULT_MAX_LOGS_SIZE,
- DEFAULT_MIN_LOG_RETENTION_PERIOD, DEFAULT_WORKER_INTERVAL);
- }
-
- /**
- * Creates a wal object with no defaults, using the specified parameters in
- * the constructor for operation.
- *
- * @param path
- * @param clazz
- * @param rollSize
- * bytes - max size of a single file before we roll
- * @param maxLogsSize
- * bytes - total amount of logs to keep excluding the current log
- * @param minLogRentionPeriod
- * ms - minimum amount of time to keep a log
- * @param workerInterval
- * ms - how often the background worker checks for old logs
- * @throws IOException
- */
- public WAL(File path, Class<T> clazz, long rollSize,
- long maxLogsSize, long minLogRentionPeriod,
- long workerInterval) throws IOException {
- this.path = path;
- this.rollSize = rollSize;
- this.maxLogsSize = maxLogsSize;
- this.minLogRetentionPeriod = minLogRentionPeriod;
- this.workerInterval = workerInterval;
-
- StringBuffer buffer = new StringBuffer();
- buffer.append("path = ").append(path).append(", ");
- buffer.append("rollSize = ").append(rollSize).append(", ");
- buffer.append("maxLogsSize = ").append(maxLogsSize).append(", ");
- buffer.append("minLogRentionPeriod = ").append(minLogRentionPeriod).append(", ");
- buffer.append("workerInterval = ").append(workerInterval);
- LOG.info("WAL Parameters: " + buffer);
-
- File clazzNamePath = new File(path, "clazz");
- createOrDie(path);
- if (clazzNamePath.exists()) {
- String clazzName = Files.readFirstLine(clazzNamePath, Charsets.UTF_8);
- if (!clazzName.equals(clazz.getName())) {
- throw new IOException("WAL is for " + clazzName
- + " and you are passing " + clazz.getName());
- }
- } else {
- Files.write(clazz.getName().getBytes(Charsets.UTF_8), clazzNamePath);
- }
-
- dataPath = new File(path, "data");
- sequenceIDPath = new File(path, "seq");
- createOrDie(dataPath);
- createOrDie(sequenceIDPath);
- this.clazz = clazz;
-
- backgroundWorker = new Worker(this);
- backgroundWorker.setName("WAL-Worker-" + path.getAbsolutePath());
- backgroundWorker.setDaemon(true);
- backgroundWorker.start();
-
- roll();
- }
-
- private void roll() throws IOException {
- try {
- rollInProgress = true;
- LOG.info("Rolling WAL " + this.path);
- if (dataFileWALWriter != null) {
- fileLargestSequenceIDMap.put(dataFileWALWriter.getPath()
- .getAbsolutePath(), dataFileWALWriter.getLargestSequenceID());
- dataFileWALWriter.close();
- }
- if (sequenceIDWALWriter != null) {
- fileLargestSequenceIDMap.put(sequenceIDWALWriter.getPath()
- .getAbsolutePath(), sequenceIDWALWriter.getLargestSequenceID());
- sequenceIDWALWriter.close();
- }
- long ts = System.currentTimeMillis();
- File dataWalFileName = new File(dataPath, Long.toString(ts));
- File seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
- while (dataWalFileName.exists() || seqWalFileName.exists()) {
- ts++;
- dataWalFileName = new File(dataPath, Long.toString(ts));
- seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
- }
-
- dataFileWALWriter = new WALDataFile.Writer<T>(dataWalFileName);
- sequenceIDWALWriter = new WALDataFile.Writer<NullWritable>(seqWalFileName);
- rollRequired = false;
- } finally {
- rollInProgress = false;
- // already have lock but is more clear
- synchronized (this) {
- notifyAll();
- }
- }
- }
-
- public WALReplayResult<T> replay() throws IOException {
- final AtomicLong sequenceID = new AtomicLong(0);
- final Map<String, Long> fileLargestSequenceIDMap = Maps.newHashMap();
- final AtomicLong totalBytes = new AtomicLong(0);
- // first get the total amount of data we have to read in
- readFiles(sequenceIDPath, new Function<File, Void>() {
- @Override
- public Void apply(File input) {
- totalBytes.addAndGet(input.length());
- return null;
- }
- });
- // then estimate the size of the array
- // needed to hold all the sequence ids
- int baseSize = WALEntry.getBaseSize();
- int numEntries = Math.max((int)((totalBytes.get() / baseSize) * 1.05f) + 1,
- numReplaySequenceIDOverride);
- LOG.info("Replay assumptions: baseSize = " + baseSize
- + ", estimatedNumEntries " + numEntries);
- final SequenceIDBuffer sequenceIDs = new SequenceIDBuffer(numEntries);
-
- // read them all into ram
- final AtomicInteger index = new AtomicInteger(0);
- readFiles(sequenceIDPath, new Function<File, Void>() {
- @Override
- public Void apply(File input) {
- LOG.info("Replaying " + input);
- WALDataFile.Reader<NullWritable> reader = null;
- int localIndex = index.get();
- try {
- // item stored is a NullWritable so we only store the base WALEntry
- reader = new WALDataFile.Reader<NullWritable>(input,
- NullWritable.class);
- List<WALEntry<NullWritable>> batch;
- long largestForFile = Long.MIN_VALUE;
- while ((batch = reader.nextBatch()) != null) {
- for(WALEntry<NullWritable> entry : batch) {
- long current = entry.getSequenceID();
- sequenceIDs.put(localIndex++, current);
- largestForFile = Math.max(largestForFile, current);
- }
- }
- sequenceID.set(Math.max(largestForFile, sequenceID.get()));
- fileLargestSequenceIDMap.put(input.getAbsolutePath(),
- largestForFile);
- } catch (IOException e) {
- Throwables.propagate(e);
- } finally {
- index.set(localIndex);
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
- return null;
- }
- });
-
- sequenceIDs.sort();
-
- // now read all edits storing items with a sequence id
- // which is *not* in the sequenceIDs
- final List<WALEntry<T>> entries = Lists.newArrayList();
- final Class<T> dataClazz = clazz;
- readFiles(dataPath, new Function<File, Void>() {
- @Override
- public Void apply(File input) {
- LOG.info("Replaying " + input);
- WALDataFile.Reader<T> reader = null;
- try {
- reader = new WALDataFile.Reader<T>(input, dataClazz);
- List<WALEntry<T>> batch = Lists.newArrayList();
- long largestForFile = Long.MIN_VALUE;
- while ((batch = reader.nextBatch()) != null) {
- for(WALEntry<T> entry : batch) {
- long current = entry.getSequenceID();
- if (!sequenceIDs.exists(current)) {
- entries.add(entry);
- }
- largestForFile = Math.max(largestForFile, current);
- }
- }
- sequenceID.set(Math.max(largestForFile, sequenceID.get()));
- fileLargestSequenceIDMap.put(input.getAbsolutePath(),
- largestForFile);
- } catch (IOException e) {
- Throwables.propagate(e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
- return null;
- }
- });
- sequenceIDs.close();
- synchronized (this.fileLargestSequenceIDMap) {
- this.fileLargestSequenceIDMap.clear();
- this.fileLargestSequenceIDMap.putAll(fileLargestSequenceIDMap);
- LOG.info("SequenceIDMap " + fileLargestSequenceIDMap);
- }
- largestCommitedSequenceID.set(sequenceID.get());
- LOG.info("Replay complete: LargestCommitedSequenceID = " + largestCommitedSequenceID.get());
- return new WALReplayResult<T>(entries, largestCommitedSequenceID.get());
- }
-
- public void writeEntries(List<WALEntry<T>> entries) throws IOException {
- Preconditions.checkNotNull(dataFileWALWriter,
- "Write is null, close must have been called");
- synchronized (this) {
- if (isRollRequired()) {
- roll();
- }
- }
- waitWhileRolling();
- boolean error = true;
- try {
- dataFileWALWriter.append(entries);
- error = false;
- } finally {
- if (error) {
- rollRequired = true;
- }
- }
- }
-
- public void writeEntry(WALEntry<T> entry) throws IOException {
- List<WALEntry<T>> entries = Lists.newArrayList();
- entries.add(entry);
- writeEntries(entries);
- }
-
- public void writeSequenceID(long sequenceID) throws IOException {
- List<Long> sequenceIDs = Lists.newArrayList();
- sequenceIDs.add(sequenceID);
- writeSequenceIDs(sequenceIDs);
- }
- public void writeSequenceIDs(List<Long> sequenceIDs) throws IOException {
- Preconditions.checkNotNull(sequenceIDWALWriter,
- "Write is null, close must have been called");
- synchronized (this) {
- if (isRollRequired()) {
- roll();
- }
- }
- waitWhileRolling();
- boolean error = true;
- try {
- List<WALEntry<NullWritable>> entries = Lists.newArrayList();
- for(Long sequenceID : sequenceIDs) {
- largestCommitedSequenceID.set(Math.max(sequenceID,
- largestCommitedSequenceID.get()));
- entries.add(new WALEntry<NullWritable>(NullWritable.get(), sequenceID));
- sequenceIDWALWriter.append(entries);
- }
- error = false;
- } finally {
- if (error) {
- rollRequired = true;
- }
- }
- }
-
- private void waitWhileRolling() {
- synchronized (this) {
- while (rollInProgress) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- if (backgroundWorker != null) {
- backgroundWorker.shutdown();
- }
- if (sequenceIDWALWriter != null) {
- sequenceIDWALWriter.close();
- sequenceIDWALWriter = null;
- }
- if (dataFileWALWriter != null) {
- dataFileWALWriter.close();
- dataFileWALWriter = null;
- }
- }
-
- private boolean isRollRequired() throws IOException {
- if (rollRequired) {
- return true;
- }
- return Math.max(dataFileWALWriter.getSize(), sequenceIDWALWriter.getSize()) > rollSize;
- }
-
- private void readFiles(File path, Function<File, Void> function)
- throws IOException {
- File[] dataFiles = path.listFiles();
- List<File> files = Lists.newArrayList();
- if (dataFiles != null) {
- for (File dataFile : dataFiles) {
- if (!dataFile.isFile()) {
- throw new IOException("Not file " + dataFile);
- }
- files.add(dataFile);
- }
- }
- for (File dataFile : files) {
- function.apply(dataFile);
- }
- }
-
- private void createOrDie(File path) throws IOException {
- if (!path.isDirectory()) {
- if (!path.mkdirs()) {
- throw new IOException("Unable to create " + path);
- }
- }
- }
-
- private static class Worker extends Thread {
- private WAL<? extends Writable> wal;
- private volatile boolean run = true;
- public Worker(WAL<? extends Writable> wal) {
- this.wal = wal;
- }
-
- @Override
- public void run() {
- LOG.info("Background worker reporting for duty");
- while (run) {
- try {
- try {
- Thread.sleep(wal.workerInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!run) {
- continue;
- }
- List<String> filesToRemove = Lists.newArrayList();
- long totalSize = 0;
- synchronized (wal.fileLargestSequenceIDMap) {
- for (String key : wal.fileLargestSequenceIDMap.keySet()) {
- File file = new File(key);
- totalSize += file.length();
- }
- if (totalSize >= wal.maxLogsSize) {
- for (String key : wal.fileLargestSequenceIDMap.keySet()) {
- File file = new File(key);
- Long seqid = wal.fileLargestSequenceIDMap.get(key);
- long largestCommitedSeqID = wal.largestCommitedSequenceID.get();
- if (file.exists()
- // has not been modified in 5 minutes
- && System.currentTimeMillis() - file.lastModified() > wal.minLogRetentionPeriod
- // current seqid is greater than the largest seqid in the file
- && largestCommitedSeqID > seqid) {
- filesToRemove.add(key);
- LOG.info("Removing expired file " + key + ", seqid = "
- + seqid + ", result = " + file.delete());
- }
- }
- for (String key : filesToRemove) {
- wal.fileLargestSequenceIDMap.remove(key);
- }
- }
- }
- } catch (Exception ex) {
- LOG.error("Uncaught exception in background worker", ex);
- }
- }
- LOG.warn(this.getClass().getSimpleName()
- + " moving on due to stop request");
- }
-
- public void shutdown() {
- run = false;
- this.interrupt();
- }
- }
-
-
- public void setRollSize(long rollSize) {
- this.rollSize = rollSize;
- }
-
- public void setMaxLogsSize(long maxLogsSize) {
- this.maxLogsSize = maxLogsSize;
- }
-
- public void setMinLogRetentionPeriod(long minLogRetentionPeriod) {
- this.minLogRetentionPeriod = minLogRetentionPeriod;
- }
-
- public void setWorkerInterval(long workerInterval) {
- this.workerInterval = workerInterval;
- }
-
- /**
- * Reads in a WAL and writes out a new WAL. Used if for some reason a replay
- * cannot occur due to the size of the WAL or assumptions about the number of
- * sequenceids.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static void main(String[] args) throws IOException,
- ClassNotFoundException {
- Preconditions.checkPositionIndex(0, args.length,
- "input directory is a required arg");
- Preconditions.checkPositionIndex(1, args.length,
- "output directory is a required arg");
- Preconditions.checkPositionIndex(2, args.length,
- "classname is a required arg");
- String input = args[0];
- String output = args[1];
- Class clazz = Class.forName(args[2].trim());
- WAL inputWAL = new WAL(new File(input), clazz);
- if (args.length == 4) {
- inputWAL.numReplaySequenceIDOverride = Integer.parseInt(args[3]);
- System.out.println("Overridng numReplaySequenceIDOverride: "
- + inputWAL.numReplaySequenceIDOverride);
- }
- WALReplayResult<?> result = inputWAL.replay();
- inputWAL.close();
- System.out.println(" SeqID: " + result.getSequenceID());
- System.out.println("NumEntries: " + result.getResults().size());
- WAL outputWAL = new WAL(new File(output), clazz);
- outputWAL.writeEntries(result.getResults());
- outputWAL.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
deleted file mode 100644
index 823f17c..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Lists;
-
-class WALDataFile<T extends Writable> {
-
- private static final int VERSION = 1;
-
- private static final int RECORD_TYPE_EVENT = 1;
- private static final int RECORD_TYPE_COMMIT = 2;
-
- static class Reader<T extends Writable> implements Closeable {
- Class<T> clazz;
- DataInputStream input;
- private Configuration conf = new Configuration();
- Reader(File path, Class<T> clazz) throws IOException {
- this.clazz = clazz;
- input = new DataInputStream(new FileInputStream(path));
- int version = input.readInt();
- if(version != VERSION) {
- throw new IOException("Expected " + VERSION + " and got " + version);
- }
- }
-
- List<WALEntry<T>> nextBatch() throws IOException {
- List<WALEntry<T>> batch = Lists.newArrayList();
- // read until we hit a commit marker or until the
- // commit marker is encountered
- while(true) {
- try {
- int type = input.readInt();
- if(type == RECORD_TYPE_EVENT) {
- WALEntry<T> entry = newWALEntry(clazz, conf);
- entry.readFields(input);
- batch.add(entry);
- } else if(type == RECORD_TYPE_COMMIT) {
- // we only return what we have read if we find a command entry
- return batch;
- } else {
- throw new IOException("Unknown record type " + Integer.toHexString(type));
- }
- } catch(EOFException e) {
- // in the EOF case, we crashed or shutdown while writing a batch
- // and were unable to complete that batch. As such the client
- // would have gotten an exception and retried or locally
- // stored the batch for resending later
- return null;
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- if (input != null) {
- input.close();
- }
- }
- }
-
- /**
- * Append and flush operations are synchronized as we are modifying
- * a file in said methods.
- */
- static class Writer<T extends Writable> implements Closeable {
- private FileOutputStream fileOutput;
- private DataOutputStream dataOutput;
- private AtomicLong largestSequenceID = new AtomicLong(0);
- private File path;
-
- Writer(File path) throws IOException {
- this.path = path;
- fileOutput = new FileOutputStream(path);
- dataOutput = new DataOutputStream(fileOutput);
- dataOutput.writeInt(VERSION);
- flush();
- }
-
- // TODO group commit
- synchronized void append(List<WALEntry<T>> entries) throws IOException {
- for (WALEntry<T> entry : entries) {
- largestSequenceID.set(Math.max(entry.getSequenceID(), largestSequenceID.get()));
- dataOutput.writeInt(RECORD_TYPE_EVENT);
- entry.write(dataOutput);
- }
- // if this is successful, the events have been
- // successfully persisted and will be replayed
- // in the case of a crash
- dataOutput.writeInt(RECORD_TYPE_COMMIT);
- flush(false);
- }
-
- synchronized void flush() throws IOException {
- flush(true);
- }
- synchronized void flush(boolean metadata) throws IOException {
- fileOutput.getChannel().force(metadata);
- }
-
- public long getLargestSequenceID() {
- return largestSequenceID.get();
- }
- public File getPath() {
- return path;
- }
-
- public long getSize() {
- return dataOutput.size();
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (dataOutput != null) {
- flush();
- dataOutput.close();
- }
- }
- }
- private static <T extends Writable> WALEntry<T> newWALEntry(Class<T> clazz, Configuration conf) {
- return new WALEntry<T>(ReflectionUtils.newInstance(clazz, conf));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
deleted file mode 100644
index 3bb69f9..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Wraps a {@link Writable} with a sequence id so that both can
- * be written together to a file.
- */
-public class WALEntry<T extends Writable> implements Writable {
- /**
- * Provides a minimum guarantee we are not reading complete junk
- */
- private static final int MAGIC_HEADER = 0xdeadbeef;
-
- private T data;
- private long sequenceID;
- /**
- * Only to be used when reading a wal file from disk
- */
- WALEntry(T data) {
- this(data, -1);
- }
- /**
- * Creates a WALEntry with specified payload and sequence id
- * @param data
- * @param sequenceID
- */
- public WALEntry(T data, long sequenceID) {
- this.data = data;
- this.sequenceID = sequenceID;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- int header = in.readInt();
- if(header != MAGIC_HEADER) {
- throw new IOException("Header is " + Integer.toHexString(header) +
- " expected " + Integer.toHexString(MAGIC_HEADER));
- }
- sequenceID = in.readLong();
- data.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(MAGIC_HEADER);
- out.writeLong(sequenceID);
- data.write(out);
- }
-
- public T getData() {
- return data;
- }
-
- public long getSequenceID() {
- return sequenceID;
- }
-
- static int getBaseSize() {
- int base = 4 /* magic header of type int */ + 8 /* seqid of type long */;
- return base;
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java b/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
deleted file mode 100644
index e11b575..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class WALReplayResult<T extends Writable> {
-
- private List<WALEntry<T>> results;
- private long sequenceID;
-
- public WALReplayResult(List<WALEntry<T>> results, long sequenceID) {
- this.results = results;
- this.sequenceID = sequenceID;
- }
- public List<WALEntry<T>> getResults() {
- return results;
- }
- public long getSequenceID() {
- return sequenceID;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
deleted file mode 100644
index d6313d7..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.channel.recoverable.memory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.NullSink;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class TestRecoverableMemoryChannel {
-
- private static final Logger logger = LoggerFactory
- .getLogger(TestRecoverableMemoryChannel.class);
-
- private RecoverableMemoryChannel channel;
- Context context;
-
- private File dataDir;
-
- @Before
- public void setUp() {
- dataDir = Files.createTempDir();
- Assert.assertTrue(dataDir.isDirectory());
- channel = createFileChannel();
-
- }
-
- private RecoverableMemoryChannel createFileChannel() {
- RecoverableMemoryChannel channel = new RecoverableMemoryChannel();
- context = new Context();
- context.put(RecoverableMemoryChannel.WAL_DATA_DIR, dataDir.getAbsolutePath());
- Configurables.configure(channel, context);
- channel.start();
- return channel;
- }
-
- @After
- public void teardown() {
- FileUtils.deleteQuietly(dataDir);
- }
- @Test
- public void testRestart() throws Exception {
- List<String> in = Lists.newArrayList();
- try {
- while(true) {
- in.addAll(putEvents(channel, "restart", 1, 1));
- }
- } catch (ChannelException e) {
- Assert.assertEquals("Cannot acquire capacity", e.getMessage());
- }
- channel.stop();
- channel = createFileChannel();
- List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
- Collections.sort(in);
- Collections.sort(out);
- Assert.assertEquals(in, out);
- }
- @Test
- public void testReconfigure() throws Exception {
- List<String> in = Lists.newArrayList();
- try {
- while(true) {
- in.addAll(putEvents(channel, "restart", 1, 1));
- }
- } catch (ChannelException e) {
- Assert.assertEquals("Cannot acquire capacity", e.getMessage());
- }
- Configurables.configure(channel, context);
- List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
- Collections.sort(in);
- Collections.sort(out);
- Assert.assertEquals(in, out);
- }
- @Test
- public void testRollbackWithSink() throws Exception {
- final NullSink nullSink = new NullSink();
- Context ctx = new Context();
- ctx.put("batchSize", "1");
- nullSink.configure(ctx);
- nullSink.setChannel(channel);
- final int numItems = 99;
- Thread t = new Thread() {
- @Override
- public void run() {
- int count = 0;
- while(count++ < numItems) {
- try {
- nullSink.process();
- Thread.sleep(1);
- } catch(EventDeliveryException e) {
- break;
- } catch (Exception e) {
- Throwables.propagate(e);
- }
- }
- }
- };
- t.setDaemon(true);
- t.setName("NullSink");
- t.start();
-
- putEvents(channel, "rollback", 10, 100);
-
- Transaction transaction;
- // put an item we will rollback
- transaction = channel.getTransaction();
- transaction.begin();
- channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
- transaction.rollback();
- transaction.close();
-
- while(t.isAlive()) {
- Thread.sleep(1);
- }
-
-
- // simulate crash
- channel.stop();
- channel = createFileChannel();
-
- // get the item which was not rolled back
- transaction = channel.getTransaction();
- transaction.begin();
- Event event = channel.take();
- transaction.commit();
- transaction.close();
- Assert.assertNotNull(event);
- Assert.assertEquals("rollback-90-9", new String(event.getBody(), Charsets.UTF_8));
- }
-
-
- @Test
- public void testRollback() throws Exception {
- // put an item and commit
- putEvents(channel, "rollback", 1, 50);
-
- Transaction transaction;
- // put an item we will rollback
- transaction = channel.getTransaction();
- transaction.begin();
- channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
- transaction.rollback();
- transaction.close();
-
- // simulate crash
- channel.stop();
- channel = createFileChannel();
-
- // get the item which was not rolled back
- transaction = channel.getTransaction();
- transaction.begin();
- Event event = channel.take();
- transaction.commit();
- transaction.close();
- Assert.assertNotNull(event);
- Assert.assertEquals("rollback-0-0", new String(event.getBody(), Charsets.UTF_8));
- }
- @Test
- public void testPut() throws Exception {
- // should find no items
- int found = takeEvents(channel, 1, 5).size();
- Assert.assertEquals(0, found);
- putEvents(channel, "unbatched", 1, 5);
- putEvents(channel, "batched", 5, 5);
- }
- @Test
- public void testThreaded() throws IOException, InterruptedException {
- int numThreads = 10;
- final CountDownLatch producerStopLatch = new CountDownLatch(numThreads);
- // due to limited capacity we must wait for consumers to start to put
- final CountDownLatch consumerStartLatch = new CountDownLatch(numThreads);
- final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
- final List<Exception> errors = Collections
- .synchronizedList(new ArrayList<Exception>());
- final List<String> expected = Collections
- .synchronizedList(new ArrayList<String>());
- final List<String> actual = Collections
- .synchronizedList(new ArrayList<String>());
- for (int i = 0; i < numThreads; i++) {
- final int id = i;
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- consumerStartLatch.await();
- if (id % 2 == 0) {
- expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
- } else {
- expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
- }
- logger.info("Completed some puts " + expected.size());
- } catch (Exception e) {
- logger.error("Error doing puts", e);
- errors.add(e);
- } finally {
- producerStopLatch.countDown();
- }
- }
- };
- t.setDaemon(true);
- t.start();
- }
- for (int i = 0; i < numThreads; i++) {
- final int id = i;
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- consumerStartLatch.countDown();
- consumerStartLatch.await();
- while(!producerStopLatch.await(1, TimeUnit.SECONDS) ||
- expected.size() > actual.size()) {
- if (id % 2 == 0) {
- actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
- } else {
- actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
- }
- }
- if(actual.isEmpty()) {
- logger.error("Found nothing!");
- } else {
- logger.info("Completed some takes " + actual.size());
- }
- } catch (Exception e) {
- logger.error("Error doing takes", e);
- errors.add(e);
- } finally {
- consumerStopLatch.countDown();
- }
- }
- };
- t.setDaemon(true);
- t.start();
- }
- Assert.assertTrue("Timed out waiting for producers",
- producerStopLatch.await(30, TimeUnit.SECONDS));
- Assert.assertTrue("Timed out waiting for consumer",
- consumerStopLatch.await(30, TimeUnit.SECONDS));
- Assert.assertEquals(Collections.EMPTY_LIST, errors);
- Collections.sort(expected);
- Collections.sort(actual);
- Assert.assertEquals(expected, actual);
- }
- private static List<String> takeEvents(Channel channel, int batchSize,
- int numEvents) throws Exception {
- List<String> result = Lists.newArrayList();
- for (int i = 0; i < numEvents; i += batchSize) {
- for (int j = 0; j < batchSize; j++) {
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- try {
- Event event = channel.take();
- if(event == null) {
- transaction.commit();
- return result;
- }
- result.add(new String(event.getBody(), Charsets.UTF_8));
- transaction.commit();
- } catch (Exception ex) {
- transaction.rollback();
- throw ex;
- } finally {
- transaction.close();
- }
- }
- }
- return result;
- }
- private static List<String> putEvents(Channel channel, String prefix, int batchSize,
- int numEvents) throws Exception {
- List<String> result = Lists.newArrayList();
- for (int i = 0; i < numEvents; i += batchSize) {
- for (int j = 0; j < batchSize; j++) {
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- try {
- String s = prefix + "-" + i +"-" + j;
- Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
- result.add(s);
- channel.put(event);
- transaction.commit();
- } catch (Exception ex) {
- transaction.rollback();
- throw ex;
- } finally {
- transaction.close();
- }
- }
- }
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
deleted file mode 100644
index 4a7ce28..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.util.Random;
-
-import org.apache.flume.channel.recoverable.memory.wal.SequenceIDBuffer;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSequenceIDBuffer {
-
- @Test
- public void testBinarySearch() {
- int size = 100;
- SequenceIDBuffer buffer = new SequenceIDBuffer(size);
- Assert.assertEquals(size, buffer.size());
- for (int i = 0; i < 100; i++) {
- buffer.put(i, i);
- }
- buffer.sort();
- Assert.assertFalse(buffer.exists(-1));
- Assert.assertFalse(buffer.exists(101));
- for (int i = 0; i < 100; i++) {
- Assert.assertTrue(buffer.exists(i));
- }
- }
-
- @Test
- public void testSortAndCompareTo() {
- int size = 100;
- SequenceIDBuffer buffer = new SequenceIDBuffer(size);
- Assert.assertEquals(size, buffer.size());
- Random random = new Random();
- for (int i = 0; i < 100; i++) {
- buffer.put(i, Math.abs(random.nextLong()));
- }
-
- buffer.sort();
-
- long last = Long.MIN_VALUE;
- for (int i = 0; i < 100; i++) {
- long current = buffer.get(i);
- Assert.assertTrue(last <= current);
- }
- }
-
- @Test
- public void testSwap() {
- SequenceIDBuffer buffer = new SequenceIDBuffer(2);
- buffer.put(0, Long.MAX_VALUE);
- buffer.put(1, Long.MIN_VALUE);
- buffer.swap(0, 1);
- Assert.assertEquals(buffer.get(0), Long.MIN_VALUE);
- Assert.assertEquals(buffer.get(1), Long.MAX_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java b/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
deleted file mode 100644
index 52e3606..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.recoverable.memory.wal;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.channel.recoverable.memory.wal.WAL;
-import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
-import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class TestWAL {
-
- private static final Logger logger = LoggerFactory
- .getLogger(TestWAL.class);
-
- private File dataDir;
- private WAL<Text> wal;
-
- @Before
- public void setup() throws IOException {
- dataDir = Files.createTempDir();
- Assert.assertTrue(dataDir.isDirectory());
- wal = new WAL<Text>(dataDir, Text.class);
- }
- @After
- public void teardown() throws IOException {
- wal.close();
- FileUtils.deleteQuietly(dataDir);
- }
-
- /**
- * Create a whole bunch of files and ensure they are cleaned up
- */
- @Test
- public void testRoll() throws IOException, InterruptedException {
- wal.close();
- wal = new WAL<Text>(dataDir, Text.class, 0L, 0L, 0L, 1L);
- long seqid = 0;
- List<String> expected = strings(100);
- for(String s : expected) {
- wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
- Thread.sleep(1);
- wal.writeSequenceID(seqid);
- Thread.sleep(1);
- }
- wal.writeSequenceID(Long.MAX_VALUE);
- Thread.sleep(1000L);
- wal.close();
- File seq = new File(dataDir, "seq");
- File[] seqFiles = seq.listFiles();
- Assert.assertNotNull(seqFiles);
- Assert.assertTrue(seqFiles.length < 5);
- File data = new File(dataDir, "data");
- File[] dataFiles = data.listFiles();
- Assert.assertNotNull(dataFiles);
- Assert.assertTrue(dataFiles.length < 5);
- }
-
- @Test
- public void testBasicReplay() throws IOException {
- long seqid = 0;
- List<String> expected = strings(100);
- for(String s : expected) {
- wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
- }
- wal.close();
- wal = new WAL<Text>(dataDir, Text.class);
- WALReplayResult<Text> result = wal.replay();
- Assert.assertEquals(100, result.getSequenceID());
- List<String> actual = toStringList(result.getResults());
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void testReplayAtOffset() throws IOException {
- long seqid = 0;
- List<String> expected = strings(100);
- for(String s : expected) {
- wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
- }
- wal.writeSequenceID(50);
- expected.remove(50);
- wal.close();
- wal = new WAL<Text>(dataDir, Text.class);
- WALReplayResult<Text> result = wal.replay();
- Assert.assertEquals(100, result.getSequenceID());
- List<String> actual = toStringList(result.getResults());
- Collections.sort(expected);
- Collections.sort(actual);
- Assert.assertEquals(99, actual.size());
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void testReplayNone() throws IOException {
- long seqid = 0;
- List<String> expected = strings(100);
- for(String s : expected) {
- wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
- wal.writeSequenceID(seqid);
- }
- wal.close();
- wal = new WAL<Text>(dataDir, Text.class);
- WALReplayResult<Text> result = wal.replay();
- Assert.assertEquals(expected.size(), result.getSequenceID());
- List<String> actual = toStringList(result.getResults());
- Assert.assertEquals(Collections.EMPTY_LIST, actual);
- }
-
- @Test
- public void testThreadedAppend() throws IOException, InterruptedException {
- int numThreads = 10;
- final CountDownLatch startLatch = new CountDownLatch(numThreads);
- final CountDownLatch stopLatch = new CountDownLatch(numThreads);
- final AtomicLong seqid = new AtomicLong(0);
- final List<String> globalExpected = Collections.synchronizedList(new ArrayList<String>());
- final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>());
- for (int i = 0; i < numThreads; i++) {
- final int id = i;
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- List<String> expected = strings(100);
- globalExpected.addAll(expected);
- startLatch.countDown();
- startLatch.await();
- // half batch, half do not
- if(id % 2 == 0) {
- for(String s : expected) {
- wal.writeEntry(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
- }
- } else {
- List<WALEntry<Text>> batch = Lists.newArrayList();
- for(String s : expected) {
- batch.add(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
- }
- wal.writeEntries(batch);
- }
- } catch (Exception e) {
- logger.warn("Error doing appends", e);
- errors.add(e);
- } finally {
- stopLatch.countDown();
- }
- }
- };
- t.setDaemon(true);
- t.start();
- }
- Assert.assertTrue(stopLatch.await(30, TimeUnit.SECONDS));
- Assert.assertEquals(Collections.EMPTY_LIST, errors);
- wal.close();
- wal = new WAL<Text>(dataDir, Text.class);
- WALReplayResult<Text> result = wal.replay();
- Assert.assertEquals(1000, result.getSequenceID());
- List<String> actual = toStringList(result.getResults());
- // we don't know what order the items threads will be able to
- // append to the wal, so sort to the lists to make then sensible
- Collections.sort(actual);
- Collections.sort(globalExpected);
- Assert.assertEquals(globalExpected, actual);
- }
-
- @Test(expected=IOException.class)
- public void testInvalidReadClass() throws IOException {
- wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
- wal.close();
- new WAL<IntWritable>(dataDir, IntWritable.class);
- }
-
- @Test(expected=NullPointerException.class)
- public void testCloseSingle() throws IOException {
- wal.close();
- wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected=NullPointerException.class)
- public void testCloseList() throws IOException {
- wal.close();
- wal.writeEntries(Lists.newArrayList(new WALEntry<Text>(new Text(""), 1)));
- }
-
- @Test(expected=NullPointerException.class)
- public void testCloseSequenceID() throws IOException {
- wal.close();
- wal.writeSequenceID(1L);
- }
-
- private static List<String> strings(int num) {
- List<String> result = Lists.newArrayList();
- for (int i = 0; i < num; i++) {
- String s = Integer.toString(num);
- result.add(s);
- }
- return result;
- }
- private static List<String> toStringList(List<WALEntry<Text>> list) {
- List<String> result = Lists.newArrayList();
- for(WALEntry<Text> entry : list) {
- result.add(entry.getData().toString());
- }
- return result;
- }
-
- public static void main(String[] args) throws IOException {
- Preconditions.checkPositionIndex(0, args.length,
- "size of event is a required arg");
- Preconditions.checkPositionIndex(1, args.length,
- "batch size is a required arg");
-
- int size = Integer.parseInt(args[0]);
- int batchSize = Integer.parseInt(args[1]);
-
- byte[] buffer = new byte[size];
- for (int i = 0; i < buffer.length; i++) {
- buffer[i] = (byte)'A';
- }
- BytesWritable bytes = new BytesWritable(buffer);
- List<WALEntry<BytesWritable>> batch = Lists.newArrayList();
- long seqid = 0;
- long numBytes = 0;
- long count = 0;
- long start = System.currentTimeMillis();
- File dataDir = Files.createTempDir();
- try {
- WAL<BytesWritable> wal = new WAL<BytesWritable>(dataDir, BytesWritable.class);
- while(true) {
- batch.clear();
- for (int i = 0; i < batchSize; i++) {
- batch.add(new WALEntry<BytesWritable>(bytes, seqid++));
- }
- wal.writeEntries(batch);
- count += batchSize;
- numBytes += buffer.length * batchSize;
-
- long expired = System.currentTimeMillis() - start;
- if(expired > 10000L) {
- start = System.currentTimeMillis();
- System.out.println(String.format("Events/s %d, MB/s %4.2f", (count/10),
- (double)(numBytes/1024L/1024L)/(double)(expired/1000L)));
- numBytes = count = 0;
- }
- }
- } finally {
- FileUtils.deleteQuietly(dataDir);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties b/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
deleted file mode 100644
index 739ecc8..0000000
--- a/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-log4j.rootLogger = INFO, out
-
-log4j.appender.out = org.apache.log4j.ConsoleAppender
-log4j.appender.out.layout = org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.flume = DEBUG
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-channels/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml
index 5c6fa76..6485d47 100644
--- a/flume-ng-channels/pom.xml
+++ b/flume-ng-channels/pom.xml
@@ -43,6 +43,5 @@ limitations under the License.
<modules>
<module>flume-jdbc-channel</module>
<module>flume-file-channel</module>
- <module>flume-recoverable-memory-channel</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
index 1e1a46f..26f4dd7 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
@@ -44,15 +44,7 @@ public class ChannelConfiguration extends ComponentConfiguration {
/**
* JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
*/
- JDBC("org.apache.flume.conf.channel.JdbcChannelConfiguration"),
-
- //For now, I am leaving just basic validation for recoverable
- //memory channel. This should be updated when proper config stubs are added.
- /**
- * Recoverable Memory Channel
- * @see org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
- */
- RECOVERABLEMEMORY("org.apache.flume.conf.channel.RecoverableMemoryChannelConfiguration");
+ JDBC("org.apache.flume.conf.channel.JdbcChannelConfiguration");
private String channelConfigurationType;
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
index 475341d..15b8cc3 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
@@ -44,15 +44,8 @@ public enum ChannelType {
/**
* JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
*/
- JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
+ JDBC("org.apache.flume.channel.jdbc.JdbcChannel");
- /**
- * Recoverable Memory Channel
- *
- * @see org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
- */
- RECOVERABLEMEMORY(
- "org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel");
private final String channelClassName;
http://git-wip-us.apache.org/repos/asf/flume/blob/08843202/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 7d7688c..f20ff1e 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -93,10 +93,6 @@
<artifactId>flume-jdbc-channel</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flume.flume-ng-channels</groupId>
- <artifactId>flume-recoverable-memory-channel</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
</dependency>