You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/01/13 23:57:42 UTC

svn commit: r1231371 [1/2] - in /incubator/flume/branches/flume-728: flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ flume-ng-channels/f...

Author: arvind
Date: Fri Jan 13 22:57:41 2012
New Revision: 1231371

URL: http://svn.apache.org/viewvc?rev=1231371&view=rev
Log:
FLUME-932. Making Flume-NG components name-aware and pluggable.


Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java   (with props)
Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Fri Jan 13 22:57:41 2012
@@ -31,6 +31,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,7 @@ import com.google.common.base.Preconditi
  * its storage.
  * </p>
  */
-public class FileChannel implements Channel {
+public class FileChannel extends AbstractChannel {
 
   private static final Logger logger = LoggerFactory
       .getLogger(FileChannel.class);
@@ -158,12 +159,6 @@ public class FileChannel implements Chan
   }
 
   @Override
-  public void shutdown() {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
   public String getName() {
     // TODO Auto-generated method stub
     return null;

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java Fri Jan 13 22:57:41 2012
@@ -17,23 +17,21 @@
  */
 package org.apache.flume.channel.jdbc;
 
-import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
+import org.apache.flume.channel.AbstractChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 /**
  * <p>A JDBC based channel implementation.</p>
  */
-public class JdbcChannel implements Channel, Configurable {
+public class JdbcChannel extends AbstractChannel {
 
   private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);
 
   private JdbcChannelProvider provider;
-  private String name;
 
   /**
    * Instantiates a new JDBC Channel.
@@ -57,15 +55,11 @@ public class JdbcChannel implements Chan
   }
 
   @Override
-  public void shutdown() {
-    JdbcChannelProviderFactory.releaseProvider(name);
+  public void stop() {
+    JdbcChannelProviderFactory.releaseProvider(getName());
     provider = null;
-    name = null;
-  }
 
-  @Override
-  public String getName() {
-    return name;
+    super.stop();
   }
 
   private JdbcChannelProvider getProvider() {
@@ -74,11 +68,8 @@ public class JdbcChannel implements Chan
 
   @Override
   public void configure(Context context) {
-    // FIXME - allow name to be specified via the context
-    this.name = "jdbc";
-
-    provider = JdbcChannelProviderFactory.getProvider(context, name);
+    provider = JdbcChannelProviderFactory.getProvider(context, getName());
 
-    LOG.info("JDBC Channel initialized: " + name);
+    LOG.info("JDBC Channel initialized: " + getName());
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Fri Jan 13 22:57:41 2012
@@ -248,6 +248,8 @@ public class JdbcChannelProviderImpl imp
         tx.close();
       }
     }
+
+    LOGGER.info("Persistend event: " + persistableEvent.getEventId());
   }
 
   @Override
@@ -271,6 +273,13 @@ public class JdbcChannelProviderImpl imp
         tx.close();
       }
     }
+
+    if (result != null) {
+      LOGGER.info("Removed event: " + ((PersistableEvent) result).getEventId());
+    } else {
+      LOGGER.info("No event found for removal");
+    }
+
     return result;
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java Fri Jan 13 22:57:41 2012
@@ -107,7 +107,7 @@ public class JdbcTransactionImpl impleme
           LOGGER.info("Attempting transaction roll-back");
           connection.rollback();
         } else {
-          LOGGER.info("Attempting transaction commit");
+          LOGGER.debug("Attempting transaction commit");
           connection.commit();
         }
       } catch (SQLException ex) {

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/PersistableEvent.java Fri Jan 13 22:57:41 2012
@@ -85,6 +85,10 @@ public class PersistableEvent implements
     this.eventId = eventId;
   }
 
+  protected long getEventId() {
+    return this.eventId;
+  }
+
   public List<HeaderEntry> getHeaderEntries() {
     return headers;
   }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java Fri Jan 13 22:57:41 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.flume;
 
+import org.apache.flume.lifecycle.LifecycleAware;
+
 /**
  * <p>
  * A channel connects a <tt>Source</tt> to a <tt>Sink</tt>. The source
@@ -42,7 +44,7 @@ package org.apache.flume;
  * @see org.apache.flume.EventSink
  * @see org.apache.flume.Transaction
  */
-public interface Channel {
+public interface Channel extends LifecycleAware, NamedComponent {
 
   /**
    * <p>Puts the given event in the channel.</p>
@@ -73,16 +75,4 @@ public interface Channel {
    * @return the transaction instance associated with this channel.
    */
   public Transaction getTransaction();
-
-  /**
-   * Instructs the channel to release any resources held in preparation of
-   * shutting down.
-   */
-  public void shutdown();
-
-  /**
-   * @return the channel name.
-   * @return
-   */
-  public String getName();
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java Fri Jan 13 22:57:41 2012
@@ -17,18 +17,11 @@
  */
 package org.apache.flume;
 
-import java.util.Map;
-import java.util.Set;
 
 public interface ChannelFactory {
 
-  public boolean register(String name, Class<? extends Channel> channelClass);
+  public Channel create(String name, String type) throws FlumeException;
 
-  public boolean unregister(String name);
+  public boolean unregister(Channel channel);
 
-  public Channel create(String name) throws InstantiationException;
-
-  public Channel createFanout(String chList, Map<String, Channel> chMap) throws InstantiationException;
-
-  public Set<String> getChannelNames();
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Base class of all flume exceptions.
+ */
+public class FlumeException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  public FlumeException(String msg) {
+    super(msg);
+  }
+
+  public FlumeException(String msg, Throwable th) {
+    super(msg, th);
+  }
+
+  public FlumeException(Throwable th) {
+    super(th);
+  }
+
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume;
+
+public interface NamedComponent {
+
+  public void setName(String name);
+
+  public String getName();
+
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Sink.java Fri Jan 13 22:57:41 2012
@@ -21,7 +21,7 @@ package org.apache.flume;
 
 import org.apache.flume.lifecycle.LifecycleAware;
 
-public interface Sink extends LifecycleAware {
+public interface Sink extends LifecycleAware, NamedComponent {
 
   public void setChannel(Channel channel);
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java Fri Jan 13 22:57:41 2012
@@ -19,16 +19,12 @@
 
 package org.apache.flume;
 
-import java.util.Set;
 
 public interface SinkFactory {
 
-  public boolean register(String name, Class<? extends Sink> sinkClass);
+  public Sink create(String name, String type)
+      throws FlumeException;
 
-  public boolean unregister(String name);
-
-  public Sink create(String name) throws InstantiationException;
-
-  public Set<String> getSinkNames();
+  public boolean unregister(Sink sink);
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Source.java Fri Jan 13 22:57:41 2012
@@ -19,12 +19,13 @@
 
 package org.apache.flume;
 
-import org.apache.flume.lifecycle.LifecycleAware;
+import java.util.List;
 
-public interface Source extends LifecycleAware {
+import org.apache.flume.lifecycle.LifecycleAware;
 
-  public void setChannel(Channel channel);
+public interface Source extends LifecycleAware, NamedComponent {
 
-  public Channel getChannel();
+  public void setChannels(List<Channel> channels);
 
+  public List<Channel> getChannels();
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java Fri Jan 13 22:57:41 2012
@@ -19,17 +19,11 @@
 
 package org.apache.flume;
 
-import java.util.Set;
 
 public interface SourceFactory {
 
-  public boolean register(String sourceName,
-      Class<? extends Source> sourceClass);
-
-  public boolean unregister(String sourceName);
-
-  public Source create(String sourceName) throws InstantiationException;
-
-  public Set<String> getSourceNames();
+  public Source create(String sourceName, String type)
+      throws FlumeException;
 
+  public boolean unregister(Source source);
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public abstract class AbstractChannel
+    implements Channel, LifecycleAware, Configurable {
+
+  private String name;
+
+  private LifecycleState lifecycleState;
+
+  public AbstractChannel() {
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public synchronized void start() {
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public synchronized void stop() {
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public synchronized LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
+  @Override
+  public void configure(Context context) {
+
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+/**
+ * Enumeration of built in channel types available in the system.
+ */
+public enum ChannelType {
+
+  /**
+   * Place holder for custom channels not part of this enumeration.
+   */
+  OTHER(null),
+
+  /**
+   * Memory channel
+   * @see MemoryChannel
+   */
+  MEMORY(MemoryChannel.class.getName()),
+
+  /**
+   * Fan-out channel
+   * @see FanoutChannel
+   */
+  FAN_OUT(FanoutChannel.class.getName()),
+
+  /**
+   * JDBC channel provided by org.apache.flume.channel.jdbc.JdbcChannel
+   */
+  JDBC("org.apache.flume.channel.jdbc.JdbcChannel");
+
+  private final String channelClassName;
+
+  private ChannelType(String channelClassName) {
+    this.channelClassName = channelClassName;
+  }
+
+  public String getChannelClassName() {
+    return channelClassName;
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java Fri Jan 13 22:57:41 2012
@@ -21,11 +21,10 @@ package org.apache.flume.channel;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelFactory;
+import org.apache.flume.FlumeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,83 +35,93 @@ public class DefaultChannelFactory imple
   private static final Logger logger = LoggerFactory
       .getLogger(DefaultChannelFactory.class);
 
-  private Map<String, Class<? extends Channel>> channelRegistry;
+  private Map<Class<?>, Map<String, Channel>> channels;
 
   public DefaultChannelFactory() {
-    channelRegistry = new HashMap<String, Class<? extends Channel>>();
+    channels = new HashMap<Class<?>, Map<String, Channel>>();
   }
 
   @Override
-  public boolean register(String name, Class<? extends Channel> channelClass) {
-    logger.info("Register channel name:{} class:{}", name, channelClass);
+  public boolean unregister(Channel channel) {
+    Preconditions.checkNotNull(channel);
+    logger.info("Unregister channel {}", channel);
+    boolean removed = false;
 
-    if (channelRegistry.containsKey(name)) {
-      return false;
-    }
-
-    channelRegistry.put(name, channelClass);
-    return true;
-  }
+    Map<String, Channel> channelMap = channels.get(channel.getClass());
+    if (channelMap != null) {
+      removed = (channelMap.remove(channel.getName()) != null);
 
-  @Override
-  public boolean unregister(String name) {
-    logger.info("Unregister channel class:{}", name);
-
-    return channelRegistry.remove(name) != null;
-  }
+      if (channelMap.size() == 0) {
+        channels.remove(channel.getClass());
+      }
+    }
 
-  @Override
-  public Set<String> getChannelNames() {
-    return channelRegistry.keySet();
+    return removed;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Channel create(String name) throws InstantiationException {
+  public Channel create(String name, String type) throws FlumeException {
     Preconditions.checkNotNull(name);
+    Preconditions.checkNotNull(type);
+    logger.debug("Creating instance of channel {} type {}", name, type);
 
-    logger.debug("Creating instance of channel {}", name);
+    String channelClassName = type;
 
-    if (!channelRegistry.containsKey(name)) {
-      return null;
+    ChannelType channelType = ChannelType.OTHER;
+    try {
+      channelType = ChannelType.valueOf(type.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      logger.debug("Channel type {} is a custom type", type);
     }
 
-    Channel channel = null;
+    if (!channelType.equals(ChannelType.OTHER)) {
+      channelClassName = channelType.getChannelClassName();
+    }
 
+    Class<? extends Channel> channelClass = null;
     try {
-      channel = channelRegistry.get(name).newInstance();
-    } catch (IllegalAccessException e) {
-      throw new InstantiationException("Unable to create channel " + name
-          + " due to " + e.getMessage());
+      channelClass = (Class<? extends Channel>) Class.forName(channelClassName);
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to load channel type: " + type
+          + ", class: " + channelClassName, ex);
     }
 
-    return channel;
-  }
+    Map<String, Channel> channelMap = channels.get(channelClass);
+    if (channelMap == null) {
+      channelMap = new HashMap<String, Channel>();
+      channels.put(channelClass, channelMap);
+    }
 
-  @Override
-  public String toString() {
-    return "{ channelRegistry:" + channelRegistry + " }";
-  }
+    Channel channel = channelMap.get(name);
 
-  public Map<String, Class<? extends Channel>> getChannelRegistry() {
-    return channelRegistry;
-  }
+    if (channel == null) {
+      try {
+        channel = channelClass.newInstance();
+        channel.setName(name);
+        channelMap.put(name, channel);
+      } catch (Exception ex) {
+        // Clean up channel map
+        channels.remove(channelClass);
+        throw new FlumeException("Unable to create channel: " + name
+            + ", type: " + type + ", class: " + channelClassName, ex);
+      }
+    }
 
-  public void setChannelRegistry(
-      Map<String, Class<? extends Channel>> channelRegistry) {
-    this.channelRegistry = channelRegistry;
+    return channel;
   }
 
-  // build a fanout channel from the list of channel names and map of <name,channels>
-  @Override
-  public Channel createFanout(String chList, Map<String, Channel> chMap)
-      throws InstantiationException {
-
-    FanoutChannel fnc = (FanoutChannel)create("fanout");
-    StringTokenizer tk = new StringTokenizer(chList, ",");
-    while (tk.hasMoreTokens()) {
-      fnc.addFanout(chMap.get(tk.nextToken()));
+  public Map<Class<?>, Map<String, Channel>> getRegistryClone() {
+    Map<Class<?>, Map<String, Channel>> result =
+        new HashMap<Class<?>, Map<String, Channel>>();
+
+    for (Class<?> klass : channels.keySet()) {
+      Map<String, Channel> channelMap = channels.get(klass);
+      Map<String, Channel> resultChannelMap = new HashMap<String, Channel>();
+      resultChannelMap.putAll(channelMap);
+      result.put(klass, resultChannelMap);
     }
-    return fnc;
-  }
 
+    return result;
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/FanoutChannel.java Fri Jan 13 22:57:41 2012
@@ -23,20 +23,22 @@ import java.util.LinkedList;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FanoutChannel implements Channel {
+public class FanoutChannel extends AbstractChannel {
   private final Logger logger = LoggerFactory
       .getLogger(FanoutChannel.class);
 
   /**
    * A wrapper transaction that does the operation on all channels.
    * Note that there's no two phase commit. If one of the channels
-   * throws an exception, we still execute the operations for the 
-   * rest. All the failed commits are rolled back at the end to 
+   * throws an exception, we still execute the operations for the
+   * rest. All the failed commits are rolled back at the end to
    * maintain consistent transaction state.
    * Note that the currently commit and rollback are not throwing
    * exceptions * even if one of the underlying transactions fail.
@@ -156,15 +158,11 @@ public class FanoutChannel implements Ch
   }
 
   @Override
-  public void shutdown() {
+  public void stop() {
     for (Channel ch : channelList) {
-      ch.shutdown();
+      ch.stop();
     }
-  }
 
-  @Override
-  public String getName() {
-    return "SourceFanout";
+    super.stop();
   }
-
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Jan 13 22:57:41 2012
@@ -42,9 +42,9 @@ import com.google.common.base.Preconditi
  * common queue. Channel has a marker for the last committed event in order to
  * avoid sink reading uncommitted data. The transactions keep track of the
  * actions to perform undo when rolled back.
- * 
+ *
  */
-public class MemoryChannel implements Channel, Configurable {
+public class MemoryChannel extends AbstractChannel {
 
   private static final Integer defaultCapacity = 50;
   private static final Integer defaultKeepAlive = 3;
@@ -88,8 +88,8 @@ public class MemoryChannel implements Ch
     }
 
     @Override
-    /** 
-     * Start the transaction 
+    /**
+     * Start the transaction
      *  initialize the undo lists, stamps
      *  set transaction state to Started
      */
@@ -107,7 +107,7 @@ public class MemoryChannel implements Ch
     @Override
     /**
      * Commit the transaction
-     *  If there was an event added by this transaction, then set the 
+     *  If there was an event added by this transaction, then set the
      *  commit stamp set transaction state to Committed
      */
     public void commit() {
@@ -149,7 +149,7 @@ public class MemoryChannel implements Ch
     }
 
     @Override
-    /** 
+    /**
      * Close the transaction
      *  if the transaction is still open, then roll it back
      *  set transaction state to Closed
@@ -243,7 +243,7 @@ public class MemoryChannel implements Ch
   }
 
   @Override
-  /** 
+  /**
    * Add the given event to the end of the queue
    * save the event in the undoPut queue for possible rollback
    * save the stamp of this put for commit
@@ -270,7 +270,7 @@ public class MemoryChannel implements Ch
   /**
    * undo of put for all the events in the undoPut queue, remove those from the
    * event queue
-   * 
+   *
    * @param myTxn
    */
   protected void undoPut(MemTransaction myTxn) {
@@ -326,7 +326,7 @@ public class MemoryChannel implements Ch
   /**
    * undo of take operation for each event in the undoTake list, add it back to
    * the event queue
-   * 
+   *
    * @param myTxn
    */
   protected void undoTake(MemTransaction myTxn) {
@@ -357,7 +357,7 @@ public class MemoryChannel implements Ch
 
   /**
    * Remove the given transaction from the list of open transactions
-   * 
+   *
    * @param myTxn
    */
   protected void forgetTransaction(MemTransaction myTxn) {
@@ -374,16 +374,4 @@ public class MemoryChannel implements Ch
       return null;
     }
   }
-
-  @Override
-  public void shutdown() {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public String getName() {
-    // TODO Auto-generated method stub
-    return null;
-  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Fri Jan 13 22:57:41 2012
@@ -77,7 +77,7 @@ import com.google.common.base.Preconditi
  * TODO
  * </p>
  */
-public class PseudoTxnMemoryChannel implements Channel, Configurable {
+public class PseudoTxnMemoryChannel extends AbstractChannel {
 
   private static final Integer defaultCapacity = 50;
   private static final Integer defaultKeepAlive = 3;
@@ -163,16 +163,4 @@ public class PseudoTxnMemoryChannel impl
     public void close() {
     }
   }
-
-  @Override
-  public void shutdown() {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public String getName() {
-    // TODO Auto-generated method stub
-    return null;
-  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java Fri Jan 13 22:57:41 2012
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditi
 abstract public class AbstractSink implements Sink, LifecycleAware {
 
   private Channel channel;
+  private String name;
 
   private LifecycleState lifecycleState;
 
@@ -62,4 +63,14 @@ abstract public class AbstractSink imple
     return lifecycleState;
   }
 
+  @Override
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java Fri Jan 13 22:57:41 2012
@@ -21,8 +21,8 @@ package org.apache.flume.sink;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.SinkFactory;
 import org.slf4j.Logger;
@@ -35,70 +35,95 @@ public class DefaultSinkFactory implemen
   private static final Logger logger = LoggerFactory
       .getLogger(DefaultSinkFactory.class);
 
-  public Map<String, Class<? extends Sink>> sinkRegistry;
+  private final Map<Class<?>, Map<String, Sink>> sinks;
 
   public DefaultSinkFactory() {
-    sinkRegistry = new HashMap<String, Class<? extends Sink>>();
+    sinks = new HashMap<Class<?>, Map<String, Sink>>();
   }
 
   @Override
-  public boolean register(String name, Class<? extends Sink> sinkClass) {
-    logger.info("Register sink name:{} class:{}", name, sinkClass);
-
-    if (sinkRegistry.containsKey(name)) {
-      return false;
+  public synchronized boolean unregister(Sink sink) {
+    Preconditions.checkNotNull(sink);
+    boolean removed = false;
+
+    logger.debug("Unregistering sink {}", sink);
+
+    Map<String, Sink> sinkMap = sinks.get(sink.getClass());
+    if (sinkMap != null) {
+      removed = (sinkMap.remove(sink.getName()) != null);
+
+      if (sinkMap.size() == 0) {
+        sinks.remove(sink.getClass());
+      }
     }
 
-    sinkRegistry.put(name, sinkClass);
-    return true;
+    return removed;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public boolean unregister(String name) {
-    logger.info("Unregister source class:{}", name);
+  public Sink create(String name, String type)
+      throws FlumeException {
+    Preconditions.checkNotNull(name);
+    Preconditions.checkNotNull(type);
+    logger.info("Creating instance of sink {} type{}", name, type);
 
-    return sinkRegistry.remove(name) != null;
-  }
+    String sinkClassName = type;
 
-  @Override
-  public Set<String> getSinkNames() {
-    return sinkRegistry.keySet();
-  }
+    SinkType sinkType = SinkType.OTHER;
+    try {
+      sinkType = SinkType.valueOf(type.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      logger.debug("Sink type {} is a custom type", type);
+    }
 
-  @Override
-  public Sink create(String name) throws InstantiationException {
-    Preconditions.checkNotNull(name);
+    if (!sinkType.equals(SinkType.OTHER)) {
+      sinkClassName = sinkType.getSinkClassName();
+    }
 
-    logger.debug("Creating instance of sink {}", name);
+    Class<? extends Sink> sinkClass = null;
+    try {
+      sinkClass = (Class<? extends Sink>) Class.forName(sinkClassName);
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to load sink type: " + type
+          + ", class: " + sinkClassName, ex);
+    }
 
-    if (!sinkRegistry.containsKey(name)) {
-      return null;
+    Map<String, Sink> sinkMap = sinks.get(sinkClass);
+    if (sinkMap == null) {
+      sinkMap = new HashMap<String, Sink>();
+      sinks.put(sinkClass, sinkMap);
     }
 
-    Sink sink = null;
+    Sink sink = sinkMap.get(name);
 
-    try {
-      sink = sinkRegistry.get(name).newInstance();
-    } catch (IllegalAccessException e) {
-      throw new InstantiationException("Unable to create sink " + name
-          + " due to " + e.getMessage());
+    if (sink == null) {
+      try {
+        sink = sinkClass.newInstance();
+        sink.setName(name);
+        sinkMap.put(name,  sink);
+      } catch (Exception ex) {
+        // Clean up the sink map
+        sinks.remove(sinkClass);
+        throw new FlumeException("Unable to create sink: " + name
+            + ", type: " + type + ", class: " + sinkClassName, ex);
+      }
     }
 
     return sink;
   }
 
-  @Override
-  public String toString() {
-    return "{ sinkRegistry:" + sinkRegistry + " }";
-  }
-
-  public Map<String, Class<? extends Sink>> getSinkRegistry() {
-    return sinkRegistry;
-  }
+  public synchronized Map<Class<?>, Map<String, Sink>> getRegistryClone() {
+    Map<Class<?>, Map<String, Sink>> result =
+        new HashMap<Class<?>, Map<String, Sink>>();
+
+    for (Class<?> klass : sinks.keySet()) {
+      Map<String, Sink> sinkMap = sinks.get(klass);
+      Map<String, Sink> resultSinkMap = new HashMap<String, Sink>();
+      resultSinkMap.putAll(sinkMap);
+      result.put(klass, resultSinkMap);
+    }
 
-  public void setSinkRegistry(
-      Map<String, Class<? extends Sink>> sinkRegistry) {
-    this.sinkRegistry = sinkRegistry;
+    return result;
   }
-
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink;
+
+
+/**
+ * Enumeration of built in sink types available in the system.
+ */
+public enum SinkType {
+
+  /**
+   * Place holder for custom sinks not part of this enumeration.
+   */
+  OTHER(null),
+
+
+  /**
+   * Null sink
+   * @see NullSink
+   */
+  NULL(NullSink.class.getName()),
+
+  /**
+   * Logger sink
+   * @see LoggerSink
+   */
+  LOGGER(LoggerSink.class.getName()),
+
+  /**
+   * Rolling file sink
+   * @see RollingFileSink
+   */
+  FILE_ROLL(RollingFileSink.class.getName()),
+
+  /**
+   * HDFS Sink provided by org.apache.flume.sink.hdfs.HDFSEventSink
+   */
+  HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),
+
+  /**
+   * Avro sink
+   * @see AvroSink
+   */
+  AVRO(AvroSink.class.getName());
+
+  private final String sinkClassName;
+
+  private SinkType(String sinkClassName) {
+    this.sinkClassName = sinkClassName;
+  }
+
+  public String getSinkClassName() {
+    return sinkClassName;
+  }
+
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,9 @@
 
 package org.apache.flume.source;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Source;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -27,37 +30,51 @@ import com.google.common.base.Preconditi
 
 abstract public class AbstractSource implements Source {
 
-  private Channel channel;
+  private List<Channel> channels;
+  private String name;
 
   private LifecycleState lifecycleState;
 
   public AbstractSource() {
+    channels = new ArrayList<Channel>();
     lifecycleState = LifecycleState.IDLE;
   }
 
   @Override
-  public void start() {
-    Preconditions.checkState(channel != null, "No channel configured");
+  public synchronized void start() {
+    Preconditions.checkState(channels != null, "No channel configured");
 
     lifecycleState = LifecycleState.START;
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
     lifecycleState = LifecycleState.STOP;
   }
 
-  public Channel getChannel() {
-    return channel;
+  @Override
+  public synchronized List<Channel> getChannels() {
+    return channels;
   }
 
-  public void setChannel(Channel channel) {
-    this.channel = channel;
+  @Override
+  public synchronized void setChannels(List<Channel> channels) {
+    this.channels.clear();
+    this.channels.addAll(channels);
   }
 
   @Override
-  public LifecycleState getLifecycleState() {
+  public synchronized LifecycleState getLifecycleState() {
     return lifecycleState;
   }
 
+  @Override
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Jan 13 22:57:41 2012
@@ -161,30 +161,33 @@ public class AvroSource extends Abstract
 
     counterGroup.incrementAndGet("rpc.received");
 
-    Channel channel = getChannel();
-    Transaction transaction = channel.getTransaction();
+    List<Channel> channels = getChannels();
 
-    try {
-      transaction.begin();
+    for (Channel channel : channels) {
+      Transaction transaction = channel.getTransaction();
 
-      Map<String, String> headers = new HashMap<String, String>();
+      try {
+        transaction.begin();
 
-      for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
-          .entrySet()) {
+        Map<String, String> headers = new HashMap<String, String>();
 
-        headers.put(entry.getKey().toString(), entry.getValue().toString());
-      }
+        for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+            .entrySet()) {
+
+          headers.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+
+        Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+        channel.put(event);
+        counterGroup.incrementAndGet("rpc.events");
 
-      Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
-      channel.put(event);
-      counterGroup.incrementAndGet("rpc.events");
-
-      transaction.commit();
-    } catch (ChannelException e) {
-      transaction.rollback();
-      return Status.FAILED;
-    } finally {
-      transaction.close();
+        transaction.commit();
+      } catch (ChannelException e) {
+        transaction.rollback();
+        return Status.FAILED;
+      } finally {
+        transaction.close();
+      }
     }
 
     counterGroup.incrementAndGet("rpc.successful");
@@ -196,32 +199,35 @@ public class AvroSource extends Abstract
   public Status appendBatch(List<AvroFlumeEvent> events) {
     counterGroup.incrementAndGet("rpc.received.batch");
 
-    Channel channel = getChannel();
-    Transaction transaction = channel.getTransaction();
+    List<Channel> channels = getChannels();
 
-    try {
-      transaction.begin();
+    for (Channel channel : channels) {
+      Transaction transaction = channel.getTransaction();
 
-      for (AvroFlumeEvent avroEvent : events) {
-        Map<String, String> headers = new HashMap<String, String>();
+      try {
+        transaction.begin();
 
-        for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
-            .entrySet()) {
+        for (AvroFlumeEvent avroEvent : events) {
+          Map<String, String> headers = new HashMap<String, String>();
 
-          headers.put(entry.getKey().toString(), entry.getValue().toString());
+          for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+              .entrySet()) {
+
+            headers.put(entry.getKey().toString(), entry.getValue().toString());
+          }
+
+          Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+          channel.put(event);
+          counterGroup.incrementAndGet("rpc.events");
         }
 
-        Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
-        channel.put(event);
-        counterGroup.incrementAndGet("rpc.events");
+        transaction.commit();
+      } catch (ChannelException e) {
+        transaction.rollback();
+        return Status.FAILED;
+      } finally {
+        transaction.close();
       }
-
-      transaction.commit();
-    } catch (ChannelException e) {
-      transaction.rollback();
-      return Status.FAILED;
-    } finally {
-      transaction.close();
     }
 
     counterGroup.incrementAndGet("rpc.successful");

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java Fri Jan 13 22:57:41 2012
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.flume.source;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.flume.FlumeException;
 import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
 import org.slf4j.Logger;
@@ -35,71 +34,100 @@ public class DefaultSourceFactory implem
   private static final Logger logger = LoggerFactory
       .getLogger(DefaultSourceFactory.class);
 
-  public Map<String, Class<? extends Source>> sourceRegistry;
+  /**
+   * Cache of sources created thus far. The outer map is keyed on the source
+   * type and the inner map is keyed on source name.
+   */
+  private final Map<Class<?>, Map<String, Source>> sources;
 
   public DefaultSourceFactory() {
-    sourceRegistry = new HashMap<String, Class<? extends Source>>();
+    sources = new HashMap<Class<?>, Map<String, Source>>();
   }
 
   @Override
-  public boolean register(String name, Class<? extends Source> sourceClass) {
-    logger.info("Register source name:{} class:{}", name, sourceClass);
-
-    if (sourceRegistry.containsKey(name)) {
-      return false;
+  public synchronized boolean unregister(Source source) {
+    Preconditions.checkNotNull(source);
+    boolean removed = false;
+
+    logger.debug("Unregistering source {}", source);
+
+    Map<String, Source> sourceMap = sources.get(source.getClass());
+    if (sourceMap != null) {
+      removed = (sourceMap.remove(source.getName()) != null);
+
+      if (sourceMap.size() == 0) {
+        sources.remove(source.getClass());
+      }
     }
 
-    sourceRegistry.put(name, sourceClass);
-    return true;
+    return removed;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public boolean unregister(String name) {
-    logger.info("Unregister source class:{}", name);
+  public synchronized Source create(String name, String type)
+      throws FlumeException {
+    Preconditions.checkNotNull(name);
+    Preconditions.checkNotNull(type);
 
-    return sourceRegistry.remove(name) != null;
-  }
+    logger.debug("Creating instance of source {}, type {}", name, type);
 
-  @Override
-  public Set<String> getSourceNames() {
-    return sourceRegistry.keySet();
-  }
+    String sourceClassName = type;
 
-  @Override
-  public Source create(String name) throws InstantiationException {
-    Preconditions.checkNotNull(name);
+    SourceType srcType = SourceType.OTHER;
+    try {
+      srcType = SourceType.valueOf(type.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      logger.debug("Source type {} is a custom type", type);
+    }
 
-    logger.debug("Creating instance of source {}", name);
+    if (!srcType.equals(SourceType.OTHER)) {
+      sourceClassName = srcType.getSourceClassName();
+    }
 
-    /* FIXME: Is returning null really a good idea? Should we just panic? */
-    if (!sourceRegistry.containsKey(name)) {
-      return null;
+    Class<? extends Source> sourceClass = null;
+    try {
+      sourceClass = (Class<? extends Source>) Class.forName(sourceClassName);
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to load source type: " + type
+          + ", class: " + sourceClassName, ex);
     }
 
-    Source source = null;
+    Map<String, Source> sourceMap = sources.get(sourceClass);
+    if (sourceMap == null) {
+      sourceMap = new HashMap<String, Source>();
+      sources.put(sourceClass, sourceMap);
+    }
 
-    try {
-      source = sourceRegistry.get(name).newInstance();
-    } catch (IllegalAccessException e) {
-      throw new InstantiationException("Unable to create source " + name
-          + " due to " + e.getMessage());
+    Source source = sourceMap.get(name);
+
+    if (source == null) {
+      try {
+        source = sourceClass.newInstance();
+        source.setName(name);
+        sourceMap.put(name, source);
+      } catch (Exception ex) {
+        // Clean up the source map
+        sources.remove(sourceClass);
+        throw new FlumeException("Unable to create source: " + name
+            +", type: " + type + ", class: " + sourceClassName, ex);
+      }
     }
 
     return source;
   }
 
-  @Override
-  public String toString() {
-    return "{ sinkRegistry:" + sourceRegistry + " }";
-  }
-
-  public Map<String, Class<? extends Source>> getSourceRegistry() {
-    return sourceRegistry;
-  }
+  public synchronized Map<Class<?>, Map<String, Source>> getRegistryClone() {
+    Map<Class<?>, Map<String, Source>> result =
+        new HashMap<Class<?>, Map<String, Source>>();
+
+    for (Class<?> klass : sources.keySet()) {
+      Map<String, Source> srcMap = sources.get(klass);
+      Map<String, Source> resultSrcMap = new HashMap<String, Source>();
+      resultSrcMap.putAll(srcMap);
+      result.put(klass, resultSrcMap);
+    }
 
-  public void setSourceRegistry(
-      Map<String, Class<? extends Source>> sourceRegistry) {
-    this.sourceRegistry = sourceRegistry;
+    return result;
   }
-
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Jan 13 22:57:41 2012
@@ -20,8 +20,8 @@
 package org.apache.flume.source;
 
 import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -130,7 +130,7 @@ public class ExecSource extends Abstract
     ExecRunnable runner = new ExecRunnable();
 
     runner.command = command;
-    runner.channel = getChannel();
+    runner.channels = getChannels();
     runner.counterGroup = counterGroup;
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -163,8 +163,8 @@ public class ExecSource extends Abstract
       try {
         executor.awaitTermination(500, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
-        logger
-            .debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
+        logger.debug("Interrupted while waiting for exec executor service "
+            + "to stop. Just exiting.");
         Thread.currentThread().interrupt();
       }
     }
@@ -186,12 +186,12 @@ public class ExecSource extends Abstract
   private static class ExecRunnable implements Runnable {
 
     private String command;
-    private Channel channel;
+    private List<Channel> channels;
     private CounterGroup counterGroup;
 
     @Override
     public void run() {
-      
+
       try {
         String[] commandArgs = command.split("\\s+");
         Process process = new ProcessBuilder(commandArgs).start();
@@ -203,21 +203,23 @@ public class ExecSource extends Abstract
         while ((line = reader.readLine()) != null) {
           counterGroup.incrementAndGet("exec.lines.read");
 
-          Transaction transaction = channel.getTransaction();
-          try {
-            transaction.begin();
-            Event event = EventBuilder.withBody(line.getBytes());
-            channel.put(event);
-            transaction.commit();
-          } catch (ChannelException e) {
-            transaction.rollback();
-            throw e;
-          } catch (Exception e) {
-            transaction.rollback();
-            throw e;
-          } 
-          finally {
-            transaction.close();
+          for (Channel channel : channels) {
+            Transaction transaction = channel.getTransaction();
+            try {
+              transaction.begin();
+              Event event = EventBuilder.withBody(line.getBytes());
+              channel.put(event);
+              transaction.commit();
+            } catch (ChannelException e) {
+              transaction.rollback();
+              throw e;
+            } catch (Exception e) {
+              transaction.rollback();
+              throw e;
+            }
+            finally {
+              transaction.close();
+            }
           }
         }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Fri Jan 13 22:57:41 2012
@@ -29,11 +29,13 @@ import java.nio.channels.Channels;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
@@ -295,17 +297,20 @@ public class NetcatSource extends Abstra
         event = EventBuilder.withBody(builder.toString().getBytes());
         Exception ex = null;
 
-        Transaction tx = source.getChannel().getTransaction();
+        List<Channel> channels = source.getChannels();
 
-        try {
-          tx.begin();
-          source.getChannel().put(event);
-          tx.commit();
-        } catch (Exception e) {
-          ex = e;
-          tx.rollback();
-        } finally {
-          tx.close();
+        for (Channel channel : channels) {
+          Transaction tx = channel.getTransaction();
+          try {
+            tx.begin();
+            channel.put(event);
+            tx.commit();
+          } catch (Exception e) {
+            ex = e;
+            tx.rollback();
+          } finally {
+            tx.close();
+          }
         }
 
         if (ex == null) {

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Fri Jan 13 22:57:41 2012
@@ -44,22 +44,23 @@ public class SequenceGeneratorSource ext
 
   @Override
   public Status process() throws EventDeliveryException {
-    Channel channel = getChannel();
-    Transaction transaction = channel.getTransaction();
 
-    try {
-      transaction.begin();
-      channel.put(EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
-      transaction.commit();
-
-      counterGroup.incrementAndGet("events.successful");
-    } catch (Exception e) {
-      transaction.rollback();
-      counterGroup.incrementAndGet("events.failed");
-    } finally {
-      transaction.close();
-    }
+    for (Channel channel : getChannels()) {
+      Transaction transaction = channel.getTransaction();
 
+      try {
+        transaction.begin();
+        channel.put(EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+        transaction.commit();
+
+        counterGroup.incrementAndGet("events.successful");
+      } catch (Exception e) {
+        transaction.rollback();
+        counterGroup.incrementAndGet("events.failed");
+      } finally {
+        transaction.close();
+      }
+    }
     return Status.READY;
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+
+/**
+ * Enumeration of built in source types available in the system.
+ */
+public enum SourceType {
+
+  /**
+   * Place holder for custom sources not part of this enumeration.
+   */
+  OTHER(null),
+
+  /**
+   * Sequence generator file source.
+   * @see SequenceGeneratorSource
+   */
+  SEQ(SequenceGeneratorSource.class.getName()),
+
+  /**
+   * Netcat source.
+   * @see NetcatSource
+   */
+  NETCAT(NetcatSource.class.getName()),
+
+  /**
+   * Exec source.
+   * @see ExecSource
+   */
+  EXEC(ExecSource.class.getName()),
+
+  /**
+   * Avro soruce.
+   * @see AvroSource
+   */
+  AVRO(AvroSource.class.getName());
+
+  private final String sourceClassName;
+
+  private SourceType(String sourceClassName) {
+    this.sourceClassName = sourceClassName;
+  }
+
+  public String getSourceClassName() {
+    return sourceClassName;
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SourceType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,8 @@
 
 package org.apache.flume.sink;
 
+import java.util.Map;
+
 import org.apache.flume.Sink;
 import org.apache.flume.SinkFactory;
 import org.junit.Assert;
@@ -35,54 +37,55 @@ public class TestDefaultSinkFactory {
   }
 
   @Test
-  public void testRegister() {
-    Assert.assertEquals(0, sinkFactory.getSinkNames().size());
-
-    sinkFactory.register("null", NullSink.class);
-
-    Assert.assertEquals(1, sinkFactory.getSinkNames().size());
-
-    Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
-  }
-
-  @Test
-  public void testCreate() throws InstantiationException {
-    Assert.assertEquals(0, sinkFactory.getSinkNames().size());
+  public void testDuplicateCreate() {
 
-    sinkFactory.register("null", NullSink.class);
 
-    Assert.assertEquals(1, sinkFactory.getSinkNames().size());
+    Sink avroSink1 = sinkFactory.create("avroSink1", "avro");
+    Sink avroSink2 = sinkFactory.create("avroSink2", "avro");
 
-    Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
+    Assert.assertNotNull(avroSink1);
+    Assert.assertNotNull(avroSink2);
+    Assert.assertNotSame(avroSink1, avroSink2);
+    Assert.assertTrue(avroSink1 instanceof AvroSink);
+    Assert.assertTrue(avroSink2 instanceof AvroSink);
 
-    Sink sink = sinkFactory.create("null");
+    Sink s1 = sinkFactory.create("avroSink1", "avro");
+    Sink s2 = sinkFactory.create("avroSink2", "avro");
 
-    Assert.assertNotNull("Factory returned a null sink", sink);
-    Assert.assertTrue("Source isn't an instance of NullSink",
-        sink instanceof NullSink);
-
-    sink = sinkFactory.create("i do not exist");
+    Assert.assertSame(avroSink1, s1);
+    Assert.assertSame(avroSink2, s2);
+  }
 
-    Assert.assertNull("Factory returned a sink it shouldn't have", sink);
+  private void verifySinkCreation(String name, String type, Class<?> typeClass)
+    throws Exception {
+    Sink sink = sinkFactory.create(name, type);
+    Assert.assertNotNull(sink);
+    Assert.assertTrue(typeClass.isInstance(sink));
   }
 
   @Test
-  public void testUnregister() {
-    Assert.assertEquals(0, sinkFactory.getSinkNames().size());
-
-    Assert.assertTrue("Registering a source returned false",
-        sinkFactory.register("null", NullSink.class));
-
-    Assert.assertEquals(1, sinkFactory.getSinkNames().size());
-
-    Assert.assertEquals("null", sinkFactory.getSinkNames().iterator().next());
+  public void testSinkCreation() throws Exception {
+    verifySinkCreation("null-sink", "null", NullSink.class);
+    verifySinkCreation("logger-sink", "logger", LoggerSink.class);
+    verifySinkCreation("file-roll-sink", "file_roll", RollingFileSink.class);
+    verifySinkCreation("avro-sink", "avro", AvroSink.class);
+  }
 
-    Assert.assertFalse("Unregistering an unknown sink returned true",
-        sinkFactory.unregister("i do not exist"));
-    Assert.assertTrue("Unregistering a sink returned false",
-        sinkFactory.unregister("null"));
 
-    Assert.assertEquals(0, sinkFactory.getSinkNames().size());
+  @Test
+  public void testSinkRegistry() {
+    Sink s1 = sinkFactory.create("s1", "avro");
+    Map<Class<?>, Map<String, Sink>> sr =
+        ((DefaultSinkFactory) sinkFactory).getRegistryClone();
+
+    Assert.assertEquals(1, sr.size());
+    Map<String, Sink> sinkMap = sr.get(AvroSink.class);
+    Assert.assertNotNull(sinkMap);
+    Assert.assertEquals(1, sinkMap.size());
+
+    Sink sink = sinkMap.get("s1");
+    Assert.assertNotNull(sink);
+    Assert.assertSame(s1, sink);
   }
 
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java?rev=1231371&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java Fri Jan 13 22:57:41 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Source;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public class MockSource implements Source {
+
+  private String name;
+
+  public MockSource() {
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return null;
+  }
+
+  @Override
+  public void setChannels(List<Channel> channels) {
+  }
+
+  @Override
+  public List<Channel> getChannels() {
+    return null;
+  }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/MockSource.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Fri Jan 13 22:57:41 2012
@@ -22,7 +22,9 @@ package org.apache.flume.source;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -60,7 +62,10 @@ public class TestAvroSource {
 
     Configurables.configure(channel, new Context());
 
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    source.setChannels(channels);
   }
 
   @Test

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java Fri Jan 13 22:57:41 2012
@@ -19,8 +19,12 @@
 
 package org.apache.flume.source;
 
+import java.util.Map;
+
+import org.apache.flume.Channel;
 import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,57 +39,56 @@ public class TestDefaultSourceFactory {
   }
 
   @Test
-  public void testRegister() {
-    Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
-    sourceFactory.register("seq", SequenceGeneratorSource.class);
-
-    Assert.assertEquals(1, sourceFactory.getSourceNames().size());
-
-    Assert
-        .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
-  }
-
-  @Test
-  public void testCreate() throws InstantiationException {
-    Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
-    sourceFactory.register("seq", SequenceGeneratorSource.class);
+  public void testDuplicateCreate()  {
 
-    Assert.assertEquals(1, sourceFactory.getSourceNames().size());
+    Source avroSource1 = sourceFactory.create("avroSource1", "avro");
+    Source avroSource2 = sourceFactory.create("avroSource2", "avro");
 
-    Assert
-        .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
+    Assert.assertNotNull(avroSource1);
+    Assert.assertNotNull(avroSource2);
+    Assert.assertNotSame(avroSource1, avroSource2);
+    Assert.assertTrue(avroSource1 instanceof AvroSource);
+    Assert.assertTrue(avroSource2 instanceof AvroSource);
 
-    Source source = sourceFactory.create("seq");
+    Source s1 = sourceFactory.create("avroSource1", "avro");
+    Source s2 = sourceFactory.create("avroSource2", "avro");
 
-    Assert.assertNotNull("Factory returned a null source", source);
-    Assert.assertTrue("Source isn't an instance of SequenceGeneratorSource",
-        source instanceof SequenceGeneratorSource);
+    Assert.assertSame(avroSource1, s1);
+    Assert.assertSame(avroSource2, s2);
 
-    source = sourceFactory.create("i do not exist");
+  }
 
-    Assert.assertNull("Factory returned a source it shouldn't have", source);
+  private void verifySourceCreation(String name, String type,
+      Class<?> typeClass) throws Exception {
+    Source src = sourceFactory.create(name, type);
+    Assert.assertNotNull(src);
+    Assert.assertTrue(typeClass.isInstance(src));
   }
 
   @Test
-  public void testUnregister() {
-    Assert.assertEquals(0, sourceFactory.getSourceNames().size());
-
-    Assert.assertTrue("Registering a source returned false",
-        sourceFactory.register("seq", SequenceGeneratorSource.class));
-
-    Assert.assertEquals(1, sourceFactory.getSourceNames().size());
-
-    Assert
-        .assertEquals("seq", sourceFactory.getSourceNames().iterator().next());
-
-    Assert.assertFalse("Unregistering an unknown source returned true",
-        sourceFactory.unregister("i do not exist"));
-    Assert.assertTrue("Unregistering a source returned false",
-        sourceFactory.unregister("seq"));
-
-    Assert.assertEquals(0, sourceFactory.getSourceNames().size());
+  public void testSourceCreation() throws Exception {
+    verifySourceCreation("seq-src", "seq", SequenceGeneratorSource.class);
+    verifySourceCreation("netcat-src", "netcat", NetcatSource.class);
+    verifySourceCreation("exec-src", "exec", ExecSource.class);
+    verifySourceCreation("avro-src", "avro", AvroSource.class);
+    verifySourceCreation("custom-src", MockSource.class.getCanonicalName(),
+        MockSource.class);
   }
 
+  @Test
+  public void testSourceRegistry() throws Exception {
+    Source s1 = sourceFactory.create("s1", "avro");
+    Map<Class<?>, Map<String, Source>> sr =
+        ((DefaultSourceFactory) sourceFactory).getRegistryClone();
+
+    Assert.assertEquals(1, sr.size());
+
+    Map<String, Source> srMap = sr.get(AvroSource.class);
+    Assert.assertNotNull(srMap);
+    Assert.assertEquals(1, srMap.size());
+
+    Source src = srMap.get("s1");
+    Assert.assertNotNull(src);
+    Assert.assertSame(s1, src);
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Jan 13 22:57:41 2012
@@ -22,6 +22,8 @@ package org.apache.flume.source;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
@@ -56,7 +58,10 @@ public class TestExecSource {
     Configurables.configure(source, context);
     Configurables.configure(channel, context);
 
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    source.setChannels(channels);
     source.start();
     Transaction transaction = channel.getTransaction();
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,7 @@
 
 package org.apache.flume.source;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.flume.Channel;
@@ -57,14 +58,16 @@ public class TestPollableSourceRunner {
 
     PollableSource source = new PollableSource() {
 
+      private String name;
+
       @Override
-      public Channel getChannel() {
+      public List<Channel> getChannels() {
         // Doesn't matter.
         return null;
       }
 
       @Override
-      public void setChannel(Channel channel) {
+      public void setChannels(List<Channel> channel) {
         // Doesn't matter.
       }
 
@@ -110,6 +113,16 @@ public class TestPollableSourceRunner {
         return null;
       }
 
+      @Override
+      public void setName(String name) {
+        this.name = name;
+      }
+
+      @Override
+      public String getName() {
+        return name;
+      }
+
     };
 
     sourceRunner.setSource(source);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Fri Jan 13 22:57:41 2012
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.flume.source;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -52,7 +54,10 @@ public class TestSequenceGeneratorSource
     Configurables.configure(source, context);
     Configurables.configure(channel, context);
 
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    source.setChannels(channels);
 
     for (long i = 0; i < 100; i++) {
       source.process();
@@ -75,7 +80,10 @@ public class TestSequenceGeneratorSource
     Configurables.configure(source, context);
     Configurables.configure(channel, context);
 
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    source.setChannels(channels);
     source.start();
 
     for (long i = 0; i < 100; i++) {