You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/01/03 03:24:27 UTC

git commit: FLUME-2275. Improve scalability of MorphlineInterceptor under contention

Updated Branches:
  refs/heads/trunk 3b1034e82 -> c3a9c80ab


FLUME-2275. Improve scalability of MorphlineInterceptor under contention

(Wolfgang Hoschek via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c3a9c80a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c3a9c80a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c3a9c80a

Branch: refs/heads/trunk
Commit: c3a9c80ab431f6ba670142c7ce6813692422f764
Parents: 3b1034e
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Jan 2 18:22:31 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Jan 2 18:23:37 2014 -0800

----------------------------------------------------------------------
 .../solr/morphline/MorphlineInterceptor.java     | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c3a9c80a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
index 8e5e4b3..ef8f716 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
@@ -23,18 +23,18 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.interceptor.Interceptor;
-
 import org.kitesdk.morphline.api.Command;
 import org.kitesdk.morphline.api.Record;
 import org.kitesdk.morphline.base.Fields;
+
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
 
@@ -47,7 +47,7 @@ import com.google.common.io.ByteStreams;
 public class MorphlineInterceptor implements Interceptor {
 
   private final Context context;
-  private final BlockingQueue<LocalMorphlineInterceptor> pool = new LinkedBlockingQueue();
+  private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>();
   
   protected MorphlineInterceptor(Context context) {
     Preconditions.checkNotNull(context);
@@ -61,9 +61,8 @@ public class MorphlineInterceptor implements Interceptor {
 
   @Override
   public void close() {
-    List<LocalMorphlineInterceptor> interceptors = new ArrayList();
-    pool.drainTo(interceptors);
-    for (LocalMorphlineInterceptor interceptor : interceptors) {
+    LocalMorphlineInterceptor interceptor;
+    while ((interceptor = pool.poll()) != null) {
       interceptor.close();
     }
   }
@@ -85,11 +84,7 @@ public class MorphlineInterceptor implements Interceptor {
   }
 
   private void returnToPool(LocalMorphlineInterceptor interceptor) {
-    try {
-      pool.put(interceptor);
-    } catch (InterruptedException e) {
-      throw new FlumeException(e);
-    }
+    pool.add(interceptor);
   }
   
   private LocalMorphlineInterceptor borrowFromPool() {