You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/01/03 03:24:27 UTC
git commit: FLUME-2275. Improve scalability of MorphlineInterceptor
under contention
Updated Branches:
refs/heads/trunk 3b1034e82 -> c3a9c80ab
FLUME-2275. Improve scalability of MorphlineInterceptor under contention
(Wolfgang Hoschek via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c3a9c80a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c3a9c80a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c3a9c80a
Branch: refs/heads/trunk
Commit: c3a9c80ab431f6ba670142c7ce6813692422f764
Parents: 3b1034e
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Jan 2 18:22:31 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Jan 2 18:23:37 2014 -0800
----------------------------------------------------------------------
.../solr/morphline/MorphlineInterceptor.java | 19 +++++++------------
1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c3a9c80a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
index 8e5e4b3..ef8f716 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java
@@ -23,18 +23,18 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.interceptor.Interceptor;
-
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Fields;
+
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
@@ -47,7 +47,7 @@ import com.google.common.io.ByteStreams;
public class MorphlineInterceptor implements Interceptor {
private final Context context;
- private final BlockingQueue<LocalMorphlineInterceptor> pool = new LinkedBlockingQueue();
+ private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>();
protected MorphlineInterceptor(Context context) {
Preconditions.checkNotNull(context);
@@ -61,9 +61,8 @@ public class MorphlineInterceptor implements Interceptor {
@Override
public void close() {
- List<LocalMorphlineInterceptor> interceptors = new ArrayList();
- pool.drainTo(interceptors);
- for (LocalMorphlineInterceptor interceptor : interceptors) {
+ LocalMorphlineInterceptor interceptor;
+ while ((interceptor = pool.poll()) != null) {
interceptor.close();
}
}
@@ -85,11 +84,7 @@ public class MorphlineInterceptor implements Interceptor {
}
private void returnToPool(LocalMorphlineInterceptor interceptor) {
- try {
- pool.put(interceptor);
- } catch (InterruptedException e) {
- throw new FlumeException(e);
- }
+ pool.add(interceptor);
}
private LocalMorphlineInterceptor borrowFromPool() {