You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/01/13 23:57:42 UTC
svn commit: r1231371 [1/2] - in /incubator/flume/branches/flume-728:
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/
flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/
flume-ng-channels/f...
Author: arvind
Date: Fri Jan 13 22:57:41 2012
New Revision: 1231371
URL: http://svn.apache.org/viewvc?rev=1231371&view=rev
Log:
FLUME-932. Making Flume-NG components name-aware and pluggable.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java (with props)
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Fri Jan 13 22:57:41 2012
@@ -31,6 +31,7 @@ import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,7 @@ import com.google.common.base.Preconditi
* its storage.
* </p>
*/
-public class FileChannel implements Channel {
+public class FileChannel extends AbstractChannel {
private static final Logger logger = LoggerFactory
.getLogger(FileChannel.class);
@@ -158,12 +159,6 @@ public class FileChannel implements Chan
}
@Override
- public void shutdown() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public String getName() {
// TODO Auto-generated method stub
return null;
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java Fri Jan 13 22:57:41 2012
@@ -17,23 +17,21 @@
*/
package org.apache.flume.channel.jdbc;
-import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
+import org.apache.flume.channel.AbstractChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A JDBC based channel implementation.</p>
*/
-public class JdbcChannel implements Channel, Configurable {
+public class JdbcChannel extends AbstractChannel {
private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);
private JdbcChannelProvider provider;
- private String name;
/**
* Instantiates a new JDBC Channel.
@@ -57,15 +55,11 @@ public class JdbcChannel implements Chan
}
@Override
- public void shutdown() {
- JdbcChannelProviderFactory.releaseProvider(name);
+ public void stop() {
+ JdbcChannelProviderFactory.releaseProvider(getName());
provider = null;
- name = null;
- }
- @Override
- public String getName() {
- return name;
+ super.stop();
}
private JdbcChannelProvider getProvider() {
@@ -74,11 +68,8 @@ public class JdbcChannel implements Chan
@Override
public void configure(Context context) {
- // FIXME - allow name to be specified via the context
- this.name = "jdbc";
-
- provider = JdbcChannelProviderFactory.getProvider(context, name);
+ provider = JdbcChannelProviderFactory.getProvider(context, getName());
- LOG.info("JDBC Channel initialized: " + name);
+ LOG.info("JDBC Channel initialized: " + getName());
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Jan 13 22:57:41 2012
@@ -248,6 +248,8 @@ public class JdbcChannelProviderImpl imp
tx.close();
}
}
+
+ LOGGER.info("Persistend event: " + persistableEvent.getEventId());
}
@Override
@@ -271,6 +273,13 @@ public class JdbcChannelProviderImpl imp
tx.close();
}
}
+
+ if (result != null) {
+ LOGGER.info("Removed event: " + ((PersistableEvent) result).getEventId());
+ } else {
+ LOGGER.info("No event found for removal");
+ }
+
return result;
}
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Fri Jan 13 22:57:41 2012
@@ -107,7 +107,7 @@ public class JdbcTransactionImpl impleme
LOGGER.info("Attempting transaction roll-back");
connection.rollback();
} else {
- LOGGER.info("Attempting transaction commit");
+ LOGGER.debug("Attempting transaction commit");
connection.commit();
}
} catch (SQLException ex) {
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Fri Jan 13 22:57:41 2012
@@ -85,6 +85,10 @@ public class PersistableEvent implements
this.eventId = eventId;
}
+ protected long getEventId() {
+ return this.eventId;
+ }
+
public List<HeaderEntry> getHeaderEntries() {
return headers;
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java Fri Jan 13 22:57:41 2012
@@ -17,6 +17,8 @@
*/
package org.apache.flume;
+import org.apache.flume.lifecycle.LifecycleAware;
+
/**
* <p>
* A channel connects a <tt>Source</tt> to a <tt>Sink</tt>. The source
@@ -42,7 +44,7 @@ package org.apache.flume;
* @see org.apache.flume.EventSink
* @see org.apache.flume.Transaction
*/
-public interface Channel {
+public interface Channel extends LifecycleAware, NamedComponent {
/**
* <p>Puts the given event in the channel.</p>
@@ -73,16 +75,4 @@ public interface Channel {
* @return the transaction instance associated with this channel.
*/
public Transaction getTransaction();
-
- /**
- * Instructs the channel to release any resources held in preparation of
- * shutting down.
- */
- public void shutdown();
-
- /**
- * @return the channel name.
- * @return
- */
- public String getName();
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java Fri Jan 13 22:57:41 2012
@@ -17,18 +17,11 @@
*/
package org.apache.flume;
-import java.util.Map;
-import java.util.Set;
public interface ChannelFactory {
- public boolean register(String name, Class<? extends Channel> channelClass);
+ public Channel create(String name, String type) throws FlumeException;
- public boolean unregister(String name);
+ public boolean unregister(Channel channel);
- public Channel create(String name) throws InstantiationException;
-
- public Channel createFanout(String chList, Map<String, Channel> chMap) throws InstantiationException;
-
- public Set<String> getChannelNames();
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Base class of all flume exceptions.
+ */
+public class FlumeException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public FlumeException(String msg) {
+ super(msg);
+ }
+
+ public FlumeException(String msg, Throwable th) {
+ super(msg, th);
+ }
+
+ public FlumeException(Throwable th) {
+ super(th);
+ }
+
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume;
+
+public interface NamedComponent {
+
+ public void setName(String name);
+
+ public String getName();
+
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java Fri Jan 13 22:57:41 2012
@@ -21,7 +21,7 @@ package org.apache.flume;
import org.apache.flume.lifecycle.LifecycleAware;
-public interface Sink extends LifecycleAware {
+public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java Fri Jan 13 22:57:41 2012
@@ -19,16 +19,12 @@
package org.apache.flume;
-import java.util.Set;
public interface SinkFactory {
- public boolean register(String name, Class<? extends Sink> sinkClass);
+ public Sink create(String name, String type)
+ throws FlumeException;
- public boolean unregister(String name);
-
- public Sink create(String name) throws InstantiationException;
-
- public Set<String> getSinkNames();
+ public boolean unregister(Sink sink);
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java Fri Jan 13 22:57:41 2012
@@ -19,12 +19,13 @@
package org.apache.flume;
-import org.apache.flume.lifecycle.LifecycleAware;
+import java.util.List;
-public interface Source extends LifecycleAware {
+import org.apache.flume.lifecycle.LifecycleAware;
- public void setChannel(Channel channel);
+public interface Source extends LifecycleAware, NamedComponent {
- public Channel getChannel();
+ public void setChannels(List<Channel> channels);
+ public List<Channel> getChannels();
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java Fri Jan 13 22:57:41 2012
@@ -19,17 +19,11 @@
package org.apache.flume;
-import java.util.Set;
public interface SourceFactory {
- public boolean register(String sourceName,
- Class<? extends Source> sourceClass);
-
- public boolean unregister(String sourceName);
-
- public Source create(String sourceName) throws InstantiationException;
-
- public Set<String> getSourceNames();
+ public Source create(String sourceName, String type)
+ throws FlumeException;
+ public boolean unregister(Source source);
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public abstract class AbstractChannel
+ implements Channel, LifecycleAware, Configurable {
+
+ private String name;
+
+ private LifecycleState lifecycleState;
+
+ public AbstractChannel() {
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public synchronized void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized void start() {
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public synchronized void stop() {
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public synchronized LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public synchronized String getName() {
+ return name;
+ }
+
+ @Override
+ public void configure(Context context) {
+
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+/**
+ * Enumeration of built in channel types available in the system.
+ */
+public enum ChannelType {
+
+ /**
+ * Place holder for custom channels not part of this enumeration.
+ */
+ OTHER(null),
+
+ /**
+ * Memory channel
+ * @see MemoryChannel
+ */
+ MEMORY(MemoryChannel.class.getName()),
+
+ /**
+ * Fan-out channel
+ * @see FanoutChannel
+ */
+ FAN_OUT(FanoutChannel.class.getName()),
+
+ /**
+ * JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
+ */
+ JDBC("org.apache.flume.channel.jdbc.JdbcChannel");
+
+ private final String channelClassName;
+
+ private ChannelType(String channelClassName) {
+ this.channelClassName = channelClassName;
+ }
+
+ public String getChannelClassName() {
+ return channelClassName;
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java Fri Jan 13 22:57:41 2012
@@ -21,11 +21,10 @@ package org.apache.flume.channel;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
import org.apache.flume.Channel;
import org.apache.flume.ChannelFactory;
+import org.apache.flume.FlumeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,83 +35,93 @@ public class DefaultChannelFactory imple
private static final Logger logger = LoggerFactory
.getLogger(DefaultChannelFactory.class);
- private Map<String, Class<? extends Channel>> channelRegistry;
+ private Map<Class<?>, Map<String, Channel>> channels;
public DefaultChannelFactory() {
- channelRegistry = new HashMap<String, Class<? extends Channel>>();
+ channels = new HashMap<Class<?>, Map<String, Channel>>();
}
@Override
- public boolean register(String name, Class<? extends Channel> channelClass) {
- logger.info("Register channel name:{} class:{}", name, channelClass);
+ public boolean unregister(Channel channel) {
+ Preconditions.checkNotNull(channel);
+ logger.info("Unregister channel {}", channel);
+ boolean removed = false;
- if (channelRegistry.containsKey(name)) {
- return false;
- }
-
- channelRegistry.put(name, channelClass);
- return true;
- }
+ Map<String, Channel> channelMap = channels.get(channel.getClass());
+ if (channelMap != null) {
+ removed = (channelMap.remove(channel.getName()) != null);
- @Override
- public boolean unregister(String name) {
- logger.info("Unregister channel class:{}", name);
-
- return channelRegistry.remove(name) != null;
- }
+ if (channelMap.size() == 0) {
+ channels.remove(channel.getClass());
+ }
+ }
- @Override
- public Set<String> getChannelNames() {
- return channelRegistry.keySet();
+ return removed;
}
+ @SuppressWarnings("unchecked")
@Override
- public Channel create(String name) throws InstantiationException {
+ public Channel create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(type);
+ logger.debug("Creating instance of channel {} type {}", name, type);
- logger.debug("Creating instance of channel {}", name);
+ String channelClassName = type;
- if (!channelRegistry.containsKey(name)) {
- return null;
+ ChannelType channelType = ChannelType.OTHER;
+ try {
+ channelType = ChannelType.valueOf(type.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ logger.debug("Channel type {} is a custom type", type);
}
- Channel channel = null;
+ if (!channelType.equals(ChannelType.OTHER)) {
+ channelClassName = channelType.getChannelClassName();
+ }
+ Class<? extends Channel> channelClass = null;
try {
- channel = channelRegistry.get(name).newInstance();
- } catch (IllegalAccessException e) {
- throw new InstantiationException("Unable to create channel " + name
- + " due to " + e.getMessage());
+ channelClass = (Class<? extends Channel>) Class.forName(channelClassName);
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to load channel type: " + type
+ + ", class: " + channelClassName, ex);
}
- return channel;
- }
+ Map<String, Channel> channelMap = channels.get(channelClass);
+ if (channelMap == null) {
+ channelMap = new HashMap<String, Channel>();
+ channels.put(channelClass, channelMap);
+ }
- @Override
- public String toString() {
- return "{ channelRegistry:" + channelRegistry + " }";
- }
+ Channel channel = channelMap.get(name);
- public Map<String, Class<? extends Channel>> getChannelRegistry() {
- return channelRegistry;
- }
+ if (channel == null) {
+ try {
+ channel = channelClass.newInstance();
+ channel.setName(name);
+ channelMap.put(name, channel);
+ } catch (Exception ex) {
+ // Clean up channel map
+ channels.remove(channelClass);
+ throw new FlumeException("Unable to create channel: " + name
+ + ", type: " + type + ", class: " + channelClassName, ex);
+ }
+ }
- public void setChannelRegistry(
- Map<String, Class<? extends Channel>> channelRegistry) {
- this.channelRegistry = channelRegistry;
+ return channel;
}
- // build a fanout channel from the list of channel names and map of <name,channels>
- @Override
- public Channel createFanout(String chList, Map<String, Channel> chMap)
- throws InstantiationException {
-
- FanoutChannel fnc = (FanoutChannel)create("fanout");
- StringTokenizer tk = new StringTokenizer(chList, ",");
- while (tk.hasMoreTokens()) {
- fnc.addFanout(chMap.get(tk.nextToken()));
+ public Map<Class<?>, Map<String, Channel>> getRegistryClone() {
+ Map<Class<?>, Map<String, Channel>> result =
+ new HashMap<Class<?>, Map<String, Channel>>();
+
+ for (Class<?> klass : channels.keySet()) {
+ Map<String, Channel> channelMap = channels.get(klass);
+ Map<String, Channel> resultChannelMap = new HashMap<String, Channel>();
+ resultChannelMap.putAll(channelMap);
+ result.put(klass, resultChannelMap);
}
- return fnc;
- }
+ return result;
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java Fri Jan 13 22:57:41 2012
@@ -23,20 +23,22 @@ import java.util.LinkedList;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FanoutChannel implements Channel {
+public class FanoutChannel extends AbstractChannel {
private final Logger logger = LoggerFactory
.getLogger(FanoutChannel.class);
/**
* A wrapper transaction that does the operation on all channels.
* Note that there's no two phase commit. If one of the channels
- * throws an exception, we still execute the operations for the
- * rest. All the failed commits are rolled back at the end to
+ * throws an exception, we still execute the operations for the
+ * rest. All the failed commits are rolled back at the end to
* maintain consistent transaction state.
* Note that the currently commit and rollback are not throwing
* exceptions * even if one of the underlying transactions fail.
@@ -156,15 +158,11 @@ public class FanoutChannel implements Ch
}
@Override
- public void shutdown() {
+ public void stop() {
for (Channel ch : channelList) {
- ch.shutdown();
+ ch.stop();
}
- }
- @Override
- public String getName() {
- return "SourceFanout";
+ super.stop();
}
-
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Jan 13 22:57:41 2012
@@ -42,9 +42,9 @@ import com.google.common.base.Preconditi
* common queue. Channel has a marker for the last committed event in order to
* avoid sink reading uncommitted data. The transactions keep track of the
* actions to perform undo when rolled back.
- *
+ *
*/
-public class MemoryChannel implements Channel, Configurable {
+public class MemoryChannel extends AbstractChannel {
private static final Integer defaultCapacity = 50;
private static final Integer defaultKeepAlive = 3;
@@ -88,8 +88,8 @@ public class MemoryChannel implements Ch
}
@Override
- /**
- * Start the transaction
+ /**
+ * Start the transaction
* initialize the undo lists, stamps
* set transaction state to Started
*/
@@ -107,7 +107,7 @@ public class MemoryChannel implements Ch
@Override
/**
* Commit the transaction
- * If there was an event added by this transaction, then set the
+ * If there was an event added by this transaction, then set the
* commit stamp set transaction state to Committed
*/
public void commit() {
@@ -149,7 +149,7 @@ public class MemoryChannel implements Ch
}
@Override
- /**
+ /**
* Close the transaction
* if the transaction is still open, then roll it back
* set transaction state to Closed
@@ -243,7 +243,7 @@ public class MemoryChannel implements Ch
}
@Override
- /**
+ /**
* Add the given event to the end of the queue
* save the event in the undoPut queue for possible rollback
* save the stamp of this put for commit
@@ -270,7 +270,7 @@ public class MemoryChannel implements Ch
/**
* undo of put for all the events in the undoPut queue, remove those from the
* event queue
- *
+ *
* @param myTxn
*/
protected void undoPut(MemTransaction myTxn) {
@@ -326,7 +326,7 @@ public class MemoryChannel implements Ch
/**
* undo of take operation for each event in the undoTake list, add it back to
* the event queue
- *
+ *
* @param myTxn
*/
protected void undoTake(MemTransaction myTxn) {
@@ -357,7 +357,7 @@ public class MemoryChannel implements Ch
/**
* Remove the given transaction from the list of open transactions
- *
+ *
* @param myTxn
*/
protected void forgetTransaction(MemTransaction myTxn) {
@@ -374,16 +374,4 @@ public class MemoryChannel implements Ch
return null;
}
}
-
- @Override
- public void shutdown() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Fri Jan 13 22:57:41 2012
@@ -77,7 +77,7 @@ import com.google.common.base.Preconditi
* TODO
* </p>
*/
-public class PseudoTxnMemoryChannel implements Channel, Configurable {
+public class PseudoTxnMemoryChannel extends AbstractChannel {
private static final Integer defaultCapacity = 50;
private static final Integer defaultKeepAlive = 3;
@@ -163,16 +163,4 @@ public class PseudoTxnMemoryChannel impl
public void close() {
}
}
-
- @Override
- public void shutdown() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java Fri Jan 13 22:57:41 2012
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditi
abstract public class AbstractSink implements Sink, LifecycleAware {
private Channel channel;
+ private String name;
private LifecycleState lifecycleState;
@@ -62,4 +63,14 @@ abstract public class AbstractSink imple
return lifecycleState;
}
+ @Override
+ public synchronized void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized String getName() {
+ return name;
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java Fri Jan 13 22:57:41 2012
@@ -21,8 +21,8 @@ package org.apache.flume.sink;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.SinkFactory;
import org.slf4j.Logger;
@@ -35,70 +35,95 @@ public class DefaultSinkFactory implemen
private static final Logger logger = LoggerFactory
.getLogger(DefaultSinkFactory.class);
- public Map<String, Class<? extends Sink>> sinkRegistry;
+ private final Map<Class<?>, Map<String, Sink>> sinks;
public DefaultSinkFactory() {
- sinkRegistry = new HashMap<String, Class<? extends Sink>>();
+ sinks = new HashMap<Class<?>, Map<String, Sink>>();
}
@Override
- public boolean register(String name, Class<? extends Sink> sinkClass) {
- logger.info("Register sink name:{} class:{}", name, sinkClass);
-
- if (sinkRegistry.containsKey(name)) {
- return false;
+ public synchronized boolean unregister(Sink sink) {
+ Preconditions.checkNotNull(sink);
+ boolean removed = false;
+
+ logger.debug("Unregistering sink {}", sink);
+
+ Map<String, Sink> sinkMap = sinks.get(sink.getClass());
+ if (sinkMap != null) {
+ removed = (sinkMap.remove(sink.getName()) != null);
+
+ if (sinkMap.size() == 0) {
+ sinks.remove(sink.getClass());
+ }
}
- sinkRegistry.put(name, sinkClass);
- return true;
+ return removed;
}
+ @SuppressWarnings("unchecked")
@Override
- public boolean unregister(String name) {
- logger.info("Unregister source class:{}", name);
+ public Sink create(String name, String type)
+ throws FlumeException {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(type);
+ logger.info("Creating instance of sink {} type{}", name, type);
- return sinkRegistry.remove(name) != null;
- }
+ String sinkClassName = type;
- @Override
- public Set<String> getSinkNames() {
- return sinkRegistry.keySet();
- }
+ SinkType sinkType = SinkType.OTHER;
+ try {
+ sinkType = SinkType.valueOf(type.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ logger.debug("Sink type {} is a custom type", type);
+ }
- @Override
- public Sink create(String name) throws InstantiationException {
- Preconditions.checkNotNull(name);
+ if (!sinkType.equals(SinkType.OTHER)) {
+ sinkClassName = sinkType.getSinkClassName();
+ }
- logger.debug("Creating instance of sink {}", name);
+ Class<? extends Sink> sinkClass = null;
+ try {
+ sinkClass = (Class<? extends Sink>) Class.forName(sinkClassName);
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to load sink type: " + type
+ + ", class: " + sinkClassName, ex);
+ }
- if (!sinkRegistry.containsKey(name)) {
- return null;
+ Map<String, Sink> sinkMap = sinks.get(sinkClass);
+ if (sinkMap == null) {
+ sinkMap = new HashMap<String, Sink>();
+ sinks.put(sinkClass, sinkMap);
}
- Sink sink = null;
+ Sink sink = sinkMap.get(name);
- try {
- sink = sinkRegistry.get(name).newInstance();
- } catch (IllegalAccessException e) {
- throw new InstantiationException("Unable to create sink " + name
- + " due to " + e.getMessage());
+ if (sink == null) {
+ try {
+ sink = sinkClass.newInstance();
+ sink.setName(name);
+ sinkMap.put(name, sink);
+ } catch (Exception ex) {
+ // Clean up the sink map
+ sinks.remove(sinkClass);
+ throw new FlumeException("Unable to create sink: " + name
+ + ", type: " + type + ", class: " + sinkClassName, ex);
+ }
}
return sink;
}
- @Override
- public String toString() {
- return "{ sinkRegistry:" + sinkRegistry + " }";
- }
-
- public Map<String, Class<? extends Sink>> getSinkRegistry() {
- return sinkRegistry;
- }
+ public synchronized Map<Class<?>, Map<String, Sink>> getRegistryClone() {
+ Map<Class<?>, Map<String, Sink>> result =
+ new HashMap<Class<?>, Map<String, Sink>>();
+
+ for (Class<?> klass : sinks.keySet()) {
+ Map<String, Sink> sinkMap = sinks.get(klass);
+ Map<String, Sink> resultSinkMap = new HashMap<String, Sink>();
+ resultSinkMap.putAll(sinkMap);
+ result.put(klass, resultSinkMap);
+ }
- public void setSinkRegistry(
- Map<String, Class<? extends Sink>> sinkRegistry) {
- this.sinkRegistry = sinkRegistry;
+ return result;
}
-
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink;
+
+
+/**
+ * Enumeration of built in sink types available in the system.
+ */
+public enum SinkType {
+
+ /**
+ * Place holder for custom sinks not part of this enumeration.
+ */
+ OTHER(null),
+
+
+ /**
+ * Null sink
+ * @see NullSink
+ */
+ NULL(NullSink.class.getName()),
+
+ /**
+ * Logger sink
+ * @see LoggerSink
+ */
+ LOGGER(LoggerSink.class.getName()),
+
+ /**
+ * Rolling file sink
+ * @see RollingFileSink
+ */
+ FILE_ROLL(RollingFileSink.class.getName()),
+
+ /**
+ * HDFS Sink provided by org.apache.flume.sink.hdfs.HDFSEventSink
+ */
+ HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),
+
+ /**
+ * Avro sink
+ * @see AvroSink
+ */
+ AVRO(AvroSink.class.getName());
+
+ private final String sinkClassName;
+
+ private SinkType(String sinkClassName) {
+ this.sinkClassName = sinkClassName;
+ }
+
+ public String getSinkClassName() {
+ return sinkClassName;
+ }
+
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,9 @@
package org.apache.flume.source;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.flume.Channel;
import org.apache.flume.Source;
import org.apache.flume.lifecycle.LifecycleState;
@@ -27,37 +30,51 @@ import com.google.common.base.Preconditi
abstract public class AbstractSource implements Source {
- private Channel channel;
+ private List<Channel> channels;
+ private String name;
private LifecycleState lifecycleState;
public AbstractSource() {
+ channels = new ArrayList<Channel>();
lifecycleState = LifecycleState.IDLE;
}
@Override
- public void start() {
- Preconditions.checkState(channel != null, "No channel configured");
+ public synchronized void start() {
+ Preconditions.checkState(channels != null, "No channel configured");
lifecycleState = LifecycleState.START;
}
@Override
- public void stop() {
+ public synchronized void stop() {
lifecycleState = LifecycleState.STOP;
}
- public Channel getChannel() {
- return channel;
+ @Override
+ public synchronized List<Channel> getChannels() {
+ return channels;
}
- public void setChannel(Channel channel) {
- this.channel = channel;
+ @Override
+ public synchronized void setChannels(List<Channel> channels) {
+ this.channels.clear();
+ this.channels.addAll(channels);
}
@Override
- public LifecycleState getLifecycleState() {
+ public synchronized LifecycleState getLifecycleState() {
return lifecycleState;
}
+ @Override
+ public synchronized void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized String getName() {
+ return name;
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Jan 13 22:57:41 2012
@@ -161,30 +161,33 @@ public class AvroSource extends Abstract
counterGroup.incrementAndGet("rpc.received");
- Channel channel = getChannel();
- Transaction transaction = channel.getTransaction();
+ List<Channel> channels = getChannels();
- try {
- transaction.begin();
+ for (Channel channel : channels) {
+ Transaction transaction = channel.getTransaction();
- Map<String, String> headers = new HashMap<String, String>();
+ try {
+ transaction.begin();
- for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
- .entrySet()) {
+ Map<String, String> headers = new HashMap<String, String>();
- headers.put(entry.getKey().toString(), entry.getValue().toString());
- }
+ for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+ .entrySet()) {
+
+ headers.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+ channel.put(event);
+ counterGroup.incrementAndGet("rpc.events");
- Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
- channel.put(event);
- counterGroup.incrementAndGet("rpc.events");
-
- transaction.commit();
- } catch (ChannelException e) {
- transaction.rollback();
- return Status.FAILED;
- } finally {
- transaction.close();
+ transaction.commit();
+ } catch (ChannelException e) {
+ transaction.rollback();
+ return Status.FAILED;
+ } finally {
+ transaction.close();
+ }
}
counterGroup.incrementAndGet("rpc.successful");
@@ -196,32 +199,35 @@ public class AvroSource extends Abstract
public Status appendBatch(List<AvroFlumeEvent> events) {
counterGroup.incrementAndGet("rpc.received.batch");
- Channel channel = getChannel();
- Transaction transaction = channel.getTransaction();
+ List<Channel> channels = getChannels();
- try {
- transaction.begin();
+ for (Channel channel : channels) {
+ Transaction transaction = channel.getTransaction();
- for (AvroFlumeEvent avroEvent : events) {
- Map<String, String> headers = new HashMap<String, String>();
+ try {
+ transaction.begin();
- for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
- .entrySet()) {
+ for (AvroFlumeEvent avroEvent : events) {
+ Map<String, String> headers = new HashMap<String, String>();
- headers.put(entry.getKey().toString(), entry.getValue().toString());
+ for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+ .entrySet()) {
+
+ headers.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+ channel.put(event);
+ counterGroup.incrementAndGet("rpc.events");
}
- Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
- channel.put(event);
- counterGroup.incrementAndGet("rpc.events");
+ transaction.commit();
+ } catch (ChannelException e) {
+ transaction.rollback();
+ return Status.FAILED;
+ } finally {
+ transaction.close();
}
-
- transaction.commit();
- } catch (ChannelException e) {
- transaction.rollback();
- return Status.FAILED;
- } finally {
- transaction.close();
}
counterGroup.incrementAndGet("rpc.successful");
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java Fri Jan 13 22:57:41 2012
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.flume.source;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import org.apache.flume.FlumeException;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
import org.slf4j.Logger;
@@ -35,71 +34,100 @@ public class DefaultSourceFactory implem
private static final Logger logger = LoggerFactory
.getLogger(DefaultSourceFactory.class);
- public Map<String, Class<? extends Source>> sourceRegistry;
+ /**
+ * Cache of sources created thus far. The outer map is keyed on the source
+ * type and the inner map is keyed on source name.
+ */
+ private final Map<Class<?>, Map<String, Source>> sources;
public DefaultSourceFactory() {
- sourceRegistry = new HashMap<String, Class<? extends Source>>();
+ sources = new HashMap<Class<?>, Map<String, Source>>();
}
@Override
- public boolean register(String name, Class<? extends Source> sourceClass) {
- logger.info("Register source name:{} class:{}", name, sourceClass);
-
- if (sourceRegistry.containsKey(name)) {
- return false;
+ public synchronized boolean unregister(Source source) {
+ Preconditions.checkNotNull(source);
+ boolean removed = false;
+
+ logger.debug("Unregistering source {}", source);
+
+ Map<String, Source> sourceMap = sources.get(source.getClass());
+ if (sourceMap != null) {
+ removed = (sourceMap.remove(source.getName()) != null);
+
+ if (sourceMap.size() == 0) {
+ sources.remove(source.getClass());
+ }
}
- sourceRegistry.put(name, sourceClass);
- return true;
+ return removed;
}
+ @SuppressWarnings("unchecked")
@Override
- public boolean unregister(String name) {
- logger.info("Unregister source class:{}", name);
+ public synchronized Source create(String name, String type)
+ throws FlumeException {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(type);
- return sourceRegistry.remove(name) != null;
- }
+ logger.debug("Creating instance of source {}, type {}", name, type);
- @Override
- public Set<String> getSourceNames() {
- return sourceRegistry.keySet();
- }
+ String sourceClassName = type;
- @Override
- public Source create(String name) throws InstantiationException {
- Preconditions.checkNotNull(name);
+ SourceType srcType = SourceType.OTHER;
+ try {
+ srcType = SourceType.valueOf(type.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ logger.debug("Source type {} is a custom type", type);
+ }
- logger.debug("Creating instance of source {}", name);
+ if (!srcType.equals(SourceType.OTHER)) {
+ sourceClassName = srcType.getSourceClassName();
+ }
- /* FIXME: Is returning null really a good idea? Should we just panic? */
- if (!sourceRegistry.containsKey(name)) {
- return null;
+ Class<? extends Source> sourceClass = null;
+ try {
+ sourceClass = (Class<? extends Source>) Class.forName(sourceClassName);
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to load source type: " + type
+ + ", class: " + sourceClassName, ex);
}
- Source source = null;
+ Map<String, Source> sourceMap = sources.get(sourceClass);
+ if (sourceMap == null) {
+ sourceMap = new HashMap<String, Source>();
+ sources.put(sourceClass, sourceMap);
+ }
- try {
- source = sourceRegistry.get(name).newInstance();
- } catch (IllegalAccessException e) {
- throw new InstantiationException("Unable to create source " + name
- + " due to " + e.getMessage());
+ Source source = sourceMap.get(name);
+
+ if (source == null) {
+ try {
+ source = sourceClass.newInstance();
+ source.setName(name);
+ sourceMap.put(name, source);
+ } catch (Exception ex) {
+ // Clean up the source map
+ sources.remove(sourceClass);
+ throw new FlumeException("Unable to create source: " + name
+ +", type: " + type + ", class: " + sourceClassName, ex);
+ }
}
return source;
}
- @Override
- public String toString() {
- return "{ sinkRegistry:" + sourceRegistry + " }";
- }
-
- public Map<String, Class<? extends Source>> getSourceRegistry() {
- return sourceRegistry;
- }
+ public synchronized Map<Class<?>, Map<String, Source>> getRegistryClone() {
+ Map<Class<?>, Map<String, Source>> result =
+ new HashMap<Class<?>, Map<String, Source>>();
+
+ for (Class<?> klass : sources.keySet()) {
+ Map<String, Source> srcMap = sources.get(klass);
+ Map<String, Source> resultSrcMap = new HashMap<String, Source>();
+ resultSrcMap.putAll(srcMap);
+ result.put(klass, resultSrcMap);
+ }
- public void setSourceRegistry(
- Map<String, Class<? extends Source>> sourceRegistry) {
- this.sourceRegistry = sourceRegistry;
+ return result;
}
-
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Jan 13 22:57:41 2012
@@ -20,8 +20,8 @@
package org.apache.flume.source;
import java.io.BufferedReader;
-import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -130,7 +130,7 @@ public class ExecSource extends Abstract
ExecRunnable runner = new ExecRunnable();
runner.command = command;
- runner.channel = getChannel();
+ runner.channels = getChannels();
runner.counterGroup = counterGroup;
// FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -163,8 +163,8 @@ public class ExecSource extends Abstract
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- logger
- .debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
+ logger.debug("Interrupted while waiting for exec executor service "
+ + "to stop. Just exiting.");
Thread.currentThread().interrupt();
}
}
@@ -186,12 +186,12 @@ public class ExecSource extends Abstract
private static class ExecRunnable implements Runnable {
private String command;
- private Channel channel;
+ private List<Channel> channels;
private CounterGroup counterGroup;
@Override
public void run() {
-
+
try {
String[] commandArgs = command.split("\\s+");
Process process = new ProcessBuilder(commandArgs).start();
@@ -203,21 +203,23 @@ public class ExecSource extends Abstract
while ((line = reader.readLine()) != null) {
counterGroup.incrementAndGet("exec.lines.read");
- Transaction transaction = channel.getTransaction();
- try {
- transaction.begin();
- Event event = EventBuilder.withBody(line.getBytes());
- channel.put(event);
- transaction.commit();
- } catch (ChannelException e) {
- transaction.rollback();
- throw e;
- } catch (Exception e) {
- transaction.rollback();
- throw e;
- }
- finally {
- transaction.close();
+ for (Channel channel : channels) {
+ Transaction transaction = channel.getTransaction();
+ try {
+ transaction.begin();
+ Event event = EventBuilder.withBody(line.getBytes());
+ channel.put(event);
+ transaction.commit();
+ } catch (ChannelException e) {
+ transaction.rollback();
+ throw e;
+ } catch (Exception e) {
+ transaction.rollback();
+ throw e;
+ }
+ finally {
+ transaction.close();
+ }
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Fri Jan 13 22:57:41 2012
@@ -29,11 +29,13 @@ import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
@@ -295,17 +297,20 @@ public class NetcatSource extends Abstra
event = EventBuilder.withBody(builder.toString().getBytes());
Exception ex = null;
- Transaction tx = source.getChannel().getTransaction();
+ List<Channel> channels = source.getChannels();
- try {
- tx.begin();
- source.getChannel().put(event);
- tx.commit();
- } catch (Exception e) {
- ex = e;
- tx.rollback();
- } finally {
- tx.close();
+ for (Channel channel : channels) {
+ Transaction tx = channel.getTransaction();
+ try {
+ tx.begin();
+ channel.put(event);
+ tx.commit();
+ } catch (Exception e) {
+ ex = e;
+ tx.rollback();
+ } finally {
+ tx.close();
+ }
}
if (ex == null) {
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Fri Jan 13 22:57:41 2012
@@ -44,22 +44,23 @@ public class SequenceGeneratorSource ext
@Override
public Status process() throws EventDeliveryException {
- Channel channel = getChannel();
- Transaction transaction = channel.getTransaction();
- try {
- transaction.begin();
- channel.put(EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
- transaction.commit();
-
- counterGroup.incrementAndGet("events.successful");
- } catch (Exception e) {
- transaction.rollback();
- counterGroup.incrementAndGet("events.failed");
- } finally {
- transaction.close();
- }
+ for (Channel channel : getChannels()) {
+ Transaction transaction = channel.getTransaction();
+ try {
+ transaction.begin();
+ channel.put(EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+ transaction.commit();
+
+ counterGroup.incrementAndGet("events.successful");
+ } catch (Exception e) {
+ transaction.rollback();
+ counterGroup.incrementAndGet("events.failed");
+ } finally {
+ transaction.close();
+ }
+ }
return Status.READY;
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+
+/**
+ * Enumeration of built in source types available in the system.
+ */
+public enum SourceType {
+
+ /**
+ * Place holder for custom sources not part of this enumeration.
+ */
+ OTHER(null),
+
+ /**
+ * Sequence generator file source.
+ * @see SequenceGeneratorSource
+ */
+ SEQ(SequenceGeneratorSource.class.getName()),
+
+ /**
+ * Netcat source.
+ * @see NetcatSource
+ */
+ NETCAT(NetcatSource.class.getName()),
+
+ /**
+ * Exec source.
+ * @see ExecSource
+ */
+ EXEC(ExecSource.class.getName()),
+
+ /**
+ * Avro soruce.
+ * @see AvroSource
+ */
+ AVRO(AvroSource.class.getName());
+
+ private final String sourceClassName;
+
+ private SourceType(String sourceClassName) {
+ this.sourceClassName = sourceClassName;
+ }
+
+ public String getSourceClassName() {
+ return sourceClassName;
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,8 @@
package org.apache.flume.sink;
+import java.util.Map;
+
import org.apache.flume.Sink;
import org.apache.flume.SinkFactory;
import org.junit.Assert;
@@ -35,54 +37,55 @@ public class TestDefaultSinkFactory {
}
@Test
- public void testRegister() {
- Assert.assertEquals(0, sinkFactory.getSinkNames().size());
-
- sinkFactory.register("null", NullSink.class);
-
- Assert.assertEquals(1, sinkFactory.getSinkNames().size());
-
- Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
- }
-
- @Test
- public void testCreate() throws InstantiationException {
- Assert.assertEquals(0, sinkFactory.getSinkNames().size());
+ public void testDuplicateCreate() {
- sinkFactory.register("null", NullSink.class);
- Assert.assertEquals(1, sinkFactory.getSinkNames().size());
+ Sink avroSink1 = sinkFactory.create("avroSink1", "avro");
+ Sink avroSink2 = sinkFactory.create("avroSink2", "avro");
- Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
+ Assert.assertNotNull(avroSink1);
+ Assert.assertNotNull(avroSink2);
+ Assert.assertNotSame(avroSink1, avroSink2);
+ Assert.assertTrue(avroSink1 instanceof AvroSink);
+ Assert.assertTrue(avroSink2 instanceof AvroSink);
- Sink sink = sinkFactory.create("null");
+ Sink s1 = sinkFactory.create("avroSink1", "avro");
+ Sink s2 = sinkFactory.create("avroSink2", "avro");
- Assert.assertNotNull("Factory returned a null sink", sink);
- Assert.assertTrue("Source isn't an instance of NullSink",
- sink instanceof NullSink);
-
- sink = sinkFactory.create("i do not exist");
+ Assert.assertSame(avroSink1, s1);
+ Assert.assertSame(avroSink2, s2);
+ }
- Assert.assertNull("Factory returned a sink it shouldn't have", sink);
+ private void verifySinkCreation(String name, String type, Class<?> typeClass)
+ throws Exception {
+ Sink sink = sinkFactory.create(name, type);
+ Assert.assertNotNull(sink);
+ Assert.assertTrue(typeClass.isInstance(sink));
}
@Test
- public void testUnregister() {
- Assert.assertEquals(0, sinkFactory.getSinkNames().size());
-
- Assert.assertTrue("Registering a source returned false",
- sinkFactory.register("null", NullSink.class));
-
- Assert.assertEquals(1, sinkFactory.getSinkNames().size());
-
- Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
+ public void testSinkCreation() throws Exception {
+ verifySinkCreation("null-sink", "null", NullSink.class);
+ verifySinkCreation("logger-sink", "logger", LoggerSink.class);
+ verifySinkCreation("file-roll-sink", "file_roll", RollingFileSink.class);
+ verifySinkCreation("avro-sink", "avro", AvroSink.class);
+ }
- Assert.assertFalse("Unregistering an unknown sink returned true",
- sinkFactory.unregister("i do not exist"));
- Assert.assertTrue("Unregistering a sink returned false",
- sinkFactory.unregister("null"));
- Assert.assertEquals(0, sinkFactory.getSinkNames().size());
+ @Test
+ public void testSinkRegistry() {
+ Sink s1 = sinkFactory.create("s1", "avro");
+ Map<Class<?>, Map<String, Sink>> sr =
+ ((DefaultSinkFactory) sinkFactory).getRegistryClone();
+
+ Assert.assertEquals(1, sr.size());
+ Map<String, Sink> sinkMap = sr.get(AvroSink.class);
+ Assert.assertNotNull(sinkMap);
+ Assert.assertEquals(1, sinkMap.size());
+
+ Sink sink = sinkMap.get("s1");
+ Assert.assertNotNull(sink);
+ Assert.assertSame(s1, sink);
}
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Source;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public class MockSource implements Source {
+
+ private String name;
+
+ public MockSource() {
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return null;
+ }
+
+ @Override
+ public void setChannels(List<Channel> channels) {
+ }
+
+ @Override
+ public List<Channel> getChannels() {
+ return null;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Fri Jan 13 22:57:41 2012
@@ -22,7 +22,9 @@ package org.apache.flume.source;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -60,7 +62,10 @@ public class TestAvroSource {
Configurables.configure(channel, new Context());
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ source.setChannels(channels);
}
@Test
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java Fri Jan 13 22:57:41 2012
@@ -19,8 +19,12 @@
package org.apache.flume.source;
+import java.util.Map;
+
+import org.apache.flume.Channel;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
+import org.apache.flume.lifecycle.LifecycleState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -35,57 +39,56 @@ public class TestDefaultSourceFactory {
}
@Test
- public void testRegister() {
- Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
- sourceFactory.register("seq", SequenceGeneratorSource.class);
-
- Assert.assertEquals(1, sourceFactory.getSourceNames().size());
-
- Assert
- .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
- }
-
- @Test
- public void testCreate() throws InstantiationException {
- Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
- sourceFactory.register("seq", SequenceGeneratorSource.class);
+ public void testDuplicateCreate() {
- Assert.assertEquals(1, sourceFactory.getSourceNames().size());
+ Source avroSource1 = sourceFactory.create("avroSource1", "avro");
+ Source avroSource2 = sourceFactory.create("avroSource2", "avro");
- Assert
- .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
+ Assert.assertNotNull(avroSource1);
+ Assert.assertNotNull(avroSource2);
+ Assert.assertNotSame(avroSource1, avroSource2);
+ Assert.assertTrue(avroSource1 instanceof AvroSource);
+ Assert.assertTrue(avroSource2 instanceof AvroSource);
- Source source = sourceFactory.create("seq");
+ Source s1 = sourceFactory.create("avroSource1", "avro");
+ Source s2 = sourceFactory.create("avroSource2", "avro");
- Assert.assertNotNull("Factory returned a null source", source);
- Assert.assertTrue("Source isn't an instance of SequenceGeneratorSource",
- source instanceof SequenceGeneratorSource);
+ Assert.assertSame(avroSource1, s1);
+ Assert.assertSame(avroSource2, s2);
- source = sourceFactory.create("i do not exist");
+ }
- Assert.assertNull("Factory returned a source it shouldn't have", source);
+ private void verifySourceCreation(String name, String type,
+ Class<?> typeClass) throws Exception {
+ Source src = sourceFactory.create(name, type);
+ Assert.assertNotNull(src);
+ Assert.assertTrue(typeClass.isInstance(src));
}
@Test
- public void testUnregister() {
- Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
- Assert.assertTrue("Registering a source returned false",
- sourceFactory.register("seq", SequenceGeneratorSource.class));
-
- Assert.assertEquals(1, sourceFactory.getSourceNames().size());
-
- Assert
- .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
-
- Assert.assertFalse("Unregistering an unknown source returned true",
- sourceFactory.unregister("i do not exist"));
- Assert.assertTrue("Unregistering a source returned false",
- sourceFactory.unregister("seq"));
-
- Assert.assertEquals(0, sourceFactory.getSourceNames().size());
+ public void testSourceCreation() throws Exception {
+ verifySourceCreation("seq-src", "seq", SequenceGeneratorSource.class);
+ verifySourceCreation("netcat-src", "netcat", NetcatSource.class);
+ verifySourceCreation("exec-src", "exec", ExecSource.class);
+ verifySourceCreation("avro-src", "avro", AvroSource.class);
+ verifySourceCreation("custom-src", MockSource.class.getCanonicalName(),
+ MockSource.class);
}
+ @Test
+ public void testSourceRegistry() throws Exception {
+ Source s1 = sourceFactory.create("s1", "avro");
+ Map<Class<?>, Map<String, Source>> sr =
+ ((DefaultSourceFactory) sourceFactory).getRegistryClone();
+
+ Assert.assertEquals(1, sr.size());
+
+ Map<String, Source> srMap = sr.get(AvroSource.class);
+ Assert.assertNotNull(srMap);
+ Assert.assertEquals(1, srMap.size());
+
+ Source src = srMap.get("s1");
+ Assert.assertNotNull(src);
+ Assert.assertSame(s1, src);
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Jan 13 22:57:41 2012
@@ -22,6 +22,8 @@ package org.apache.flume.source;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.flume.Channel;
@@ -56,7 +58,10 @@ public class TestExecSource {
Configurables.configure(source, context);
Configurables.configure(channel, context);
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ source.setChannels(channels);
source.start();
Transaction transaction = channel.getTransaction();
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,7 @@
package org.apache.flume.source;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flume.Channel;
@@ -57,14 +58,16 @@ public class TestPollableSourceRunner {
PollableSource source = new PollableSource() {
+ private String name;
+
@Override
- public Channel getChannel() {
+ public List<Channel> getChannels() {
// Doesn't matter.
return null;
}
@Override
- public void setChannel(Channel channel) {
+ public void setChannels(List<Channel> channel) {
// Doesn't matter.
}
@@ -110,6 +113,16 @@ public class TestPollableSourceRunner {
return null;
}
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
};
sourceRunner.setSource(source);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Fri Jan 13 22:57:41 2012
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.flume.source;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -52,7 +54,10 @@ public class TestSequenceGeneratorSource
Configurables.configure(source, context);
Configurables.configure(channel, context);
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ source.setChannels(channels);
for (long i = 0; i < 100; i++) {
source.process();
@@ -75,7 +80,10 @@ public class TestSequenceGeneratorSource
Configurables.configure(source, context);
Configurables.configure(channel, context);
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+
+ source.setChannels(channels);
source.start();
for (long i = 0; i < 100; i++) {