You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/30 03:11:50 UTC

[1/2] git commit: STREAMS-135 | Refactored provider to be reactive and not spin lock

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-135 [created] 28406d9c7


STREAMS-135 | Refactored provider to be reactive and not spin lock


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ace69cf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ace69cf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ace69cf3

Branch: refs/heads/STREAMS-135
Commit: ace69cf34f4561429b7ed3ab2402bc956bebe25e
Parents: cbfe01a
Author: mfranklin <mf...@apache.org>
Authored: Tue Jul 29 18:04:26 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Jul 29 18:04:26 2014 -0400

----------------------------------------------------------------------
 .../processor/TwitterEventProcessor.java        | 56 ++++++---------
 .../provider/TwitterStreamProcessor.java        | 73 +++++++++++++++++++
 .../twitter/provider/TwitterStreamProvider.java | 75 ++++++++++++--------
 .../provider/TwitterStreamProviderTask.java     | 71 ------------------
 .../org/apache/streams/util/ComponentUtils.java | 32 +++++++--
 5 files changed, 165 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 1444705..3a42af9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -44,11 +44,12 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterEventProcessor implements StreamsProcessor, Runnable {
+public class TwitterEventProcessor implements StreamsProcessor, Callable<List<StreamsDatum>> {
 
     private final static String STREAMS_ID = "TwitterEventProcessor";
 
@@ -56,55 +57,38 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
     private ObjectMapper mapper = new StreamsTwitterMapper();
 
-    private BlockingQueue<String> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
     private Class inClass;
     private Class outClass;
+    private String item;
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
 
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
+    public TwitterEventProcessor(Class inClass, Class outClass, String item) {
         this.inClass = inClass;
         this.outClass = outClass;
+        this.item = item;
     }
 
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.outClass = outClass;
+    public TwitterEventProcessor(Class inClass, Class outClass) {
+        this(inClass, outClass, null);
     }
 
-    public void run() {
-
-        while(true) {
-            String item;
-            try {
-
-                item = ComponentUtils.pollUntilStringNotEmpty(inQueue);
-
-                if(item instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-
-                StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
-                for (StreamsDatum entry : process(rawDatum)) {
-                    ComponentUtils.offerUntilSuccess(entry, outQueue);
-                }
+    public TwitterEventProcessor( Class outClass) {
+        this(null, outClass, null);
+    }
 
-            } catch (Exception e) {
-                e.printStackTrace();
+    public TwitterEventProcessor( Class outClass, String item) {
+        this(null, outClass, item);
+    }
 
-            }
+    @Override
+    public List<StreamsDatum> call() throws Exception {
+        if(item != null) {
+            ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+            StreamsDatum rawDatum = new StreamsDatum(objectNode);
+            return process(rawDatum);
         }
+        return Lists.newArrayList();
     }
 
     public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
new file mode 100644
index 0000000..ec9ecc6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TwitterStreamProcessor extends StringDelimitedProcessor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
+    private static final int DEFAULT_POOL_SIZE = 5;
+
+    private final TwitterStreamProvider provider;
+    private final ExecutorService service;
+
+    public TwitterStreamProcessor(TwitterStreamProvider provider) {
+        this(provider, DEFAULT_POOL_SIZE);
+    }
+
+    public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) {
+        //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
+        super(null);
+        service = Executors.newFixedThreadPool(poolSize);
+        this.provider = provider;
+    }
+
+
+    @Override
+    public boolean process() throws IOException, InterruptedException {
+        String msg = null;
+        do {
+            msg = this.processNextMessage();
+            Thread.sleep(10);
+        } while(msg == null);
+
+        return provider.addDatum(service.submit(new TwitterEventProcessor(String.class, msg)));
+    }
+
+    public void cleanUp() {
+        ComponentUtils.shutdownExecutor(service, 1, 30);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index f7c438c..2e82ea2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -20,15 +20,13 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
 import com.twitter.hbc.core.Hosts;
 import com.twitter.hbc.core.HttpHosts;
 import com.twitter.hbc.core.endpoint.*;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.BasicAuth;
@@ -38,7 +36,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +46,7 @@ import java.math.BigInteger;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Created by sblackmon on 12/10/13.
@@ -68,17 +67,14 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
         this.config = config;
     }
 
-    protected BlockingQueue<String> hosebirdQueue;
-
-    protected volatile Queue<StreamsDatum> providerQueue;
+    protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
 
     protected Hosts hosebirdHosts;
     protected Authentication auth;
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
-
-    protected ListeningExecutorService executor;
-
+    protected AtomicBoolean running = new AtomicBoolean(false);
+    protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
     private DatumStatusCounter countersCurrent = new DatumStatusCounter();
     private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
@@ -99,13 +95,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     @Override
     public void startStream() {
-
-        for (int i = 0; i < 5; i++) {
-            executor.submit(new TwitterEventProcessor(hosebirdQueue, providerQueue, String.class));
-        }
-
-        new Thread(new TwitterStreamProviderTask(this)).start();
-
+        client.connect();
+        running.set(true);
     }
 
     @Override
@@ -114,12 +105,13 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
         StreamsResultSet current;
 
         synchronized( TwitterStreamProvider.class ) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+            drainTo(drain);
+            current = new StreamsResultSet(drain);
             current.setCounter(new DatumStatusCounter());
             current.getCounter().add(countersCurrent);
             countersTotal.add(countersCurrent);
             countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
         }
 
         return current;
@@ -131,21 +123,18 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     }
 
     @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end)
-    {
+    public StreamsResultSet readRange(DateTime start, DateTime end)  {
         throw new NotImplementedException();
     }
 
     @Override
     public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
+        return this.running.get() && !client.isDone();
     }
 
     @Override
     public void prepare(Object o) {
 
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
         Preconditions.checkNotNull(config.getEndpoint());
 
         if(config.getEndpoint().equals("userstream") ) {
@@ -217,8 +206,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
         LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
 
-        hosebirdQueue = new LinkedBlockingQueue<String>(1000);
-        providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+        providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(1000);
 
         client = new ClientBuilder()
             .name("apache/streams/streams-contrib/streams-provider-twitter")
@@ -226,20 +214,47 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
             .endpoint(endpoint)
             .authentication(auth)
             .connectionTimeout(1200000)
-            .processor(new StringDelimitedProcessor(hosebirdQueue))
+            .processor(processor)
             .build();
 
     }
 
     @Override
     public void cleanUp() {
-        for (int i = 0; i < 5; i++) {
-            hosebirdQueue.add(TwitterEventProcessor.TERMINATE);
-        }
+        this.client.stop();
+        this.processor.cleanUp();
+        this.running.set(true);
     }
 
     @Override
     public DatumStatusCounter getDatumStatusCounter() {
         return countersTotal;
     }
+
+    protected boolean addDatum(Future<List<StreamsDatum>> future) {
+        ComponentUtils.offerUntilSuccess(future, providerQueue);
+        return true;
+    }
+
+    protected void drainTo(Queue<StreamsDatum> drain) {
+        while(!providerQueue.isEmpty()) {
+            for(StreamsDatum datum : pollForDatum()) {
+                ComponentUtils.offerUntilSuccess(datum, drain);
+            }
+        }
+    }
+
+    protected List<StreamsDatum> pollForDatum()  {
+        try {
+            return providerQueue.poll().get();
+        } catch (InterruptedException e) {
+            LOGGER.warn("Interrupted while waiting for future.  Initiate shutdown.");
+            this.cleanUp();
+            Thread.currentThread().interrupt();
+            return Lists.newArrayList();
+        } catch (ExecutionException e) {
+            LOGGER.warn("Error getting tweet from future");
+            return Lists.newArrayList();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
deleted file mode 100644
index 13c5d21..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterStreamProviderTask implements Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProviderTask.class);
-
-    private TwitterStreamProvider provider;
-
-    public TwitterStreamProviderTask(TwitterStreamProvider provider) {
-        this.provider = provider;
-    }
-
-    @Override
-    public void run() {
-
-        provider.client.connect();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ace69cf3/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 6dd6cc7..f6a0b60 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -19,21 +19,24 @@
 package org.apache.streams.util;
 
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Created by sblackmon on 3/31/14.
+ * Common utilities for Streams components.
  */
 public class ComponentUtils {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
 
-    public static void offerUntilSuccess(Object entry, Queue queue) {
+    public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
 
         boolean success;
         do {
-            synchronized( ComponentUtils.class ) {
-                success = queue.offer(entry);
-            }
+            success = queue.offer(entry);
             Thread.yield();
         }
         while( !success );
@@ -55,4 +58,23 @@ public class ComponentUtils {
         return result;
     }
 
+    public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
+        stream.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
+                stream.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
+                    LOGGER.error("Executor Service did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            stream.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }


[2/2] git commit: STREAMS-135 | Adjusted checks for config so that the sample endpoint can be used

Posted by mf...@apache.org.
STREAMS-135 | Adjusted checks for config so that the sample endpoint can be used


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/28406d9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/28406d9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/28406d9c

Branch: refs/heads/STREAMS-135
Commit: 28406d9c732e4871096606b95b71f86261cff31a
Parents: ace69cf
Author: mfranklin <mf...@apache.org>
Authored: Tue Jul 29 19:46:23 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Jul 29 19:46:23 2014 -0400

----------------------------------------------------------------------
 .../twitter/provider/TwitterStreamProvider.java       | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/28406d9c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 2e82ea2..9e1ce45 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -151,17 +151,17 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
             hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 
-            Optional<List<String>> track = Optional.fromNullable(config.getTrack());
-            Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
+            boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
+            boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
 
-            if( track.isPresent() || follow.isPresent() ) {
+            if( track || follow ) {
                 LOGGER.debug("***\tPRESENT\t***");
                 StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
-                if( track.isPresent() ) {
-                    statusesFilterEndpoint.trackTerms(track.get());
+                if( track ) {
+                    statusesFilterEndpoint.trackTerms(config.getTrack());
                 }
-                else {
-                    statusesFilterEndpoint.followings(follow.get());
+                if( follow ) {
+                    statusesFilterEndpoint.followings(config.getFollow());
                 }
                 this.endpoint = statusesFilterEndpoint;
             } else {