You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/23 19:10:13 UTC

[2/4] kudu git commit: [twitter-demo] use AUTO_FLUSH_BACKGROUND session

[twitter-demo] use AUTO_FLUSH_BACKGROUND session

Changed the twitter demo application to use AUTO_FLUSH_BACKGROUND
flush mode instead of MANUAL_FLUSH mode.

Change-Id: I497c1265df132fc8ea4e635475d0d669eca21646
Reviewed-on: http://gerrit.cloudera.org:8080/4477
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fd3a05ca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fd3a05ca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fd3a05ca

Branch: refs/heads/master
Commit: fd3a05ca12fec0b803cedbf15c03fd498724ee42
Parents: 376f95b
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Sep 20 08:55:04 2016 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Fri Sep 23 19:08:12 2016 +0000

----------------------------------------------------------------------
 src/kudu/twitter-demo/CMakeLists.txt     | 89 +++++++++++++++------------
 src/kudu/twitter-demo/insert_consumer.cc | 61 +++---------------
 src/kudu/twitter-demo/insert_consumer.h  | 26 +-------
 3 files changed, 61 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/CMakeLists.txt b/src/kudu/twitter-demo/CMakeLists.txt
index 5c261e2..484f2d1 100644
--- a/src/kudu/twitter-demo/CMakeLists.txt
+++ b/src/kudu/twitter-demo/CMakeLists.txt
@@ -15,50 +15,59 @@
 # specific language governing permissions and limitations
 # under the License.
 
-find_library(LIBOAUTH_LIBRARY NAMES oauth)
-if(NOT LIBOAUTH_LIBRARY)
-  message(WARNING "liboauth not found on system. Skipping twitter demo")
+# Use pkgconfig to configure the build regarding liboauth. This allows
+# to extract info on include and library paths, etc. The liboauth library
+# is installed at alternative location on MacOS X.
+find_package(PkgConfig)
+if (NOT PKG_CONFIG_FOUND)
+  message(WARNING "pkgconfig not found. Skipping twitter demo.")
 else()
+  pkg_search_module(LIBOAUTH oauth)
+  if(NOT LIBOAUTH_FOUND)
+    message(WARNING "liboauth not found. Skipping twitter demo.")
+  else()
+    include_directories(SYSTEM ${LIBOAUTH_INCLUDE_DIRS})
+    link_directories(${LIBOAUTH_LIBRARY_DIRS})
+    add_library(twitter_demo
+      oauth.cc
+      parser.cc
+      insert_consumer.cc
+      twitter_streamer.cc)
 
-  add_library(twitter_demo
-    oauth.cc
-    parser.cc
-    insert_consumer.cc
-    twitter_streamer.cc)
+    target_link_libraries(twitter_demo
+      gutil
+      kudu_util
+      kudu_test_util)
 
-  target_link_libraries(twitter_demo
-    gutil
-    kudu_util
-    kudu_test_util)
+    target_link_libraries(twitter_demo
+      kudu_client
+      ${LIBOAUTH_LIBRARIES}
+      ${CURL_LIBRARIES}
+      ${KUDU_BASE_LIBS})
 
-  target_link_libraries(twitter_demo
-    kudu_client
-    ${LIBOAUTH_LIBRARY}
-    ${CURL_LIBRARIES}
-    ${KUDU_BASE_LIBS})
+    # Require that the tserver protobuf code is generated first
+    add_dependencies(twitter_demo
+      tserver_proto)
 
-  # Require that the tserver protobuf code is generated first
-  add_dependencies(twitter_demo
-    tserver_proto)
-
-  add_executable(ingest_firehose ingest_firehose.cc)
-  target_link_libraries(ingest_firehose
-    twitter_demo)
-
-  # Tests
-  ADD_KUDU_TEST(oauth-test)
-  # parser-test relies on symlinked data files which we can't currently copy correctly
-  # to the cluster.
-  ADD_KUDU_TEST(parser-test LABELS no_dist_test)
-  if(NOT "${NO_TESTS}")
-    target_link_libraries(oauth-test
+    add_executable(ingest_firehose ingest_firehose.cc)
+    target_link_libraries(ingest_firehose
       twitter_demo)
-    target_link_libraries(parser-test
-      twitter_demo)
-    execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt
-      ${EXECUTABLE_OUTPUT_PATH})
-    execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt
-      ${EXECUTABLE_OUTPUT_PATH})
-  endif()
 
-endif() # library checks
+    # Tests
+    ADD_KUDU_TEST(oauth-test)
+    # parser-test relies on symlinked data files which we can't currently copy correctly
+    # to the cluster.
+    ADD_KUDU_TEST(parser-test LABELS no_dist_test)
+    if(NOT "${NO_TESTS}")
+      target_link_libraries(oauth-test
+        twitter_demo)
+      target_link_libraries(parser-test
+        twitter_demo)
+      execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt
+        ${EXECUTABLE_OUTPUT_PATH})
+      execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt
+        ${EXECUTABLE_OUTPUT_PATH})
+    endif()
+
+  endif() # if(NOT LIBOAUTH_LIBRARY) ... else ...
+endif() # if (NOT PKG_CONFIG_FOUND) ... else ...

http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/insert_consumer.cc b/src/kudu/twitter-demo/insert_consumer.cc
index 2371f7a..361b8bd 100644
--- a/src/kudu/twitter-demo/insert_consumer.cc
+++ b/src/kudu/twitter-demo/insert_consumer.cc
@@ -17,20 +17,21 @@
 
 #include "kudu/twitter-demo/insert_consumer.h"
 
-#include <glog/logging.h>
+#include <ctime>
 #include <mutex>
 #include <string>
-#include <time.h>
 #include <vector>
 
+#include <glog/logging.h>
+
+#include "kudu/client/client.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/row.h"
 #include "kudu/common/schema.h"
-#include "kudu/client/client.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/stl_util.h"
-#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/twitter-demo/parser.h"
 #include "kudu/twitter-demo/twitter-schema.h"
 #include "kudu/util/status.h"
@@ -38,38 +39,19 @@
 namespace kudu {
 namespace twitter_demo {
 
-using tserver::TabletServerServiceProxy;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-using rpc::RpcController;
 using kudu::client::KuduInsert;
 using kudu::client::KuduClient;
 using kudu::client::KuduSession;
-using kudu::client::KuduStatusCallback;
-using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 
-FlushCB::FlushCB(InsertConsumer* consumer)
-  : consumer_(consumer) {
-}
-
-FlushCB::~FlushCB() {
-}
-
-void FlushCB::Run(const Status& status) {
-  consumer_->BatchFinished(status);
-}
-
-InsertConsumer::InsertConsumer(const client::sp::shared_ptr<KuduClient> &client)
+InsertConsumer::InsertConsumer(client::sp::shared_ptr<KuduClient> client)
   : initted_(false),
     schema_(CreateTwitterSchema()),
-    flush_cb_(this),
-    client_(client),
-    request_pending_(false) {
+    client_(std::move(client)) {
 }
 
 Status InsertConsumer::Init() {
-  const char *kTableName = "twitter";
+  static const string kTableName = "twitter";
   Status s = client_->OpenTable(kTableName, &table_);
   if (s.IsNotFound()) {
     gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
@@ -83,21 +65,13 @@ Status InsertConsumer::Init() {
 
   session_ = client_->NewSession();
   session_->SetTimeoutMillis(1000);
-  CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  RETURN_NOT_OK(session_->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
   initted_ = true;
   return Status::OK();
 }
 
 InsertConsumer::~InsertConsumer() {
-  // TODO: to be safe, we probably need to cancel any current RPC,
-  // or else the callback will get called on the destroyed object.
-  // Given this is just demo code, cutting this corner.
-  CHECK(!request_pending_);
-}
-
-void InsertConsumer::BatchFinished(const Status& s) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  request_pending_ = false;
+  Status s(session_->Flush());
   if (!s.ok()) {
     bool overflow;
     vector<client::KuduError*> errors;
@@ -140,21 +114,6 @@ void InsertConsumer::ConsumeJSON(const Slice& json_slice) {
   CHECK_OK(r->SetInt32("user_friends_count", event_.tweet_event.user_friends_count));
   CHECK_OK(r->SetStringCopy("user_image_url", event_.tweet_event.user_image_url));
   CHECK_OK(session_->Apply(ins.release()));
-
-  // TODO: once the auto-flush mode is implemented, switch to using that
-  // instead of the manual batching here
-  bool do_flush = false;
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    if (!request_pending_) {
-      request_pending_ = true;
-      do_flush = true;
-    }
-  }
-  if (do_flush) {
-    VLOG(1) << "Sending batch of " << session_->CountBufferedOperations();
-    session_->FlushAsync(&flush_cb_);
-  }
 }
 
 } // namespace twitter_demo

http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.h
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/insert_consumer.h b/src/kudu/twitter-demo/insert_consumer.h
index 826ca11..00cdce4 100644
--- a/src/kudu/twitter-demo/insert_consumer.h
+++ b/src/kudu/twitter-demo/insert_consumer.h
@@ -40,40 +40,21 @@ class KuduStatusCallback;
 
 namespace twitter_demo {
 
-class InsertConsumer;
-
-class FlushCB : public client::KuduStatusCallback {
- public:
-  explicit FlushCB(InsertConsumer* consumer);
-
-  virtual ~FlushCB();
-
-  virtual void Run(const Status& status) OVERRIDE;
- private:
-  InsertConsumer* consumer_;
-};
-
 // Consumer of tweet data which parses the JSON and inserts
 // into a remote tablet via RPC.
 class InsertConsumer : public TwitterConsumer {
  public:
-  explicit InsertConsumer(
-    const client::sp::shared_ptr<client::KuduClient> &client);
+  explicit InsertConsumer(client::sp::shared_ptr<client::KuduClient> client);
   ~InsertConsumer();
 
   Status Init();
 
-  virtual void ConsumeJSON(const Slice& json) OVERRIDE;
+  virtual void ConsumeJSON(const Slice& json_slice) OVERRIDE;
 
  private:
-  friend class FlushCB;
-
-  void BatchFinished(const Status& s);
-
   bool initted_;
 
   client::KuduSchema schema_;
-  FlushCB flush_cb_;
   TwitterEventParser parser_;
 
   // Reusable object for latest event.
@@ -82,9 +63,6 @@ class InsertConsumer : public TwitterConsumer {
   client::sp::shared_ptr<client::KuduClient> client_;
   client::sp::shared_ptr<client::KuduSession> session_;
   client::sp::shared_ptr<client::KuduTable> table_;
-
-  simple_spinlock lock_;
-  bool request_pending_;
 };
 
 } // namespace twitter_demo