You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/23 19:10:13 UTC
[2/4] kudu git commit: [twitter-demo] use AUTO_FLUSH_BACKGROUND
session
[twitter-demo] use AUTO_FLUSH_BACKGROUND session
Changed the twitter demo application to use AUTO_FLUSH_BACKGROUND
flush mode instead of MANUAL_FLUSH mode.
Change-Id: I497c1265df132fc8ea4e635475d0d669eca21646
Reviewed-on: http://gerrit.cloudera.org:8080/4477
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fd3a05ca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fd3a05ca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fd3a05ca
Branch: refs/heads/master
Commit: fd3a05ca12fec0b803cedbf15c03fd498724ee42
Parents: 376f95b
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Sep 20 08:55:04 2016 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Fri Sep 23 19:08:12 2016 +0000
----------------------------------------------------------------------
src/kudu/twitter-demo/CMakeLists.txt | 89 +++++++++++++++------------
src/kudu/twitter-demo/insert_consumer.cc | 61 +++---------------
src/kudu/twitter-demo/insert_consumer.h | 26 +-------
3 files changed, 61 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/CMakeLists.txt b/src/kudu/twitter-demo/CMakeLists.txt
index 5c261e2..484f2d1 100644
--- a/src/kudu/twitter-demo/CMakeLists.txt
+++ b/src/kudu/twitter-demo/CMakeLists.txt
@@ -15,50 +15,59 @@
# specific language governing permissions and limitations
# under the License.
-find_library(LIBOAUTH_LIBRARY NAMES oauth)
-if(NOT LIBOAUTH_LIBRARY)
- message(WARNING "liboauth not found on system. Skipping twitter demo")
+# Use pkgconfig to configure the build regarding liboauth. This allows
+# to extract info on include and library paths, etc. The liboauth library
+# is installed at alternative location on MacOS X.
+find_package(PkgConfig)
+if (NOT PKG_CONFIG_FOUND)
+ message(WARNING "pkgconfig not found. Skipping twitter demo.")
else()
+ pkg_search_module(LIBOAUTH oauth)
+ if(NOT LIBOAUTH_FOUND)
+ message(WARNING "liboauth not found. Skipping twitter demo.")
+ else()
+ include_directories(SYSTEM ${LIBOAUTH_INCLUDE_DIRS})
+ link_directories(${LIBOAUTH_LIBRARY_DIRS})
+ add_library(twitter_demo
+ oauth.cc
+ parser.cc
+ insert_consumer.cc
+ twitter_streamer.cc)
- add_library(twitter_demo
- oauth.cc
- parser.cc
- insert_consumer.cc
- twitter_streamer.cc)
+ target_link_libraries(twitter_demo
+ gutil
+ kudu_util
+ kudu_test_util)
- target_link_libraries(twitter_demo
- gutil
- kudu_util
- kudu_test_util)
+ target_link_libraries(twitter_demo
+ kudu_client
+ ${LIBOAUTH_LIBRARIES}
+ ${CURL_LIBRARIES}
+ ${KUDU_BASE_LIBS})
- target_link_libraries(twitter_demo
- kudu_client
- ${LIBOAUTH_LIBRARY}
- ${CURL_LIBRARIES}
- ${KUDU_BASE_LIBS})
+ # Require that the tserver protobuf code is generated first
+ add_dependencies(twitter_demo
+ tserver_proto)
- # Require that the tserver protobuf code is generated first
- add_dependencies(twitter_demo
- tserver_proto)
-
- add_executable(ingest_firehose ingest_firehose.cc)
- target_link_libraries(ingest_firehose
- twitter_demo)
-
- # Tests
- ADD_KUDU_TEST(oauth-test)
- # parser-test relies on symlinked data files which we can't currently copy correctly
- # to the cluster.
- ADD_KUDU_TEST(parser-test LABELS no_dist_test)
- if(NOT "${NO_TESTS}")
- target_link_libraries(oauth-test
+ add_executable(ingest_firehose ingest_firehose.cc)
+ target_link_libraries(ingest_firehose
twitter_demo)
- target_link_libraries(parser-test
- twitter_demo)
- execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt
- ${EXECUTABLE_OUTPUT_PATH})
- execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt
- ${EXECUTABLE_OUTPUT_PATH})
- endif()
-endif() # library checks
+ # Tests
+ ADD_KUDU_TEST(oauth-test)
+ # parser-test relies on symlinked data files which we can't currently copy correctly
+ # to the cluster.
+ ADD_KUDU_TEST(parser-test LABELS no_dist_test)
+ if(NOT "${NO_TESTS}")
+ target_link_libraries(oauth-test
+ twitter_demo)
+ target_link_libraries(parser-test
+ twitter_demo)
+ execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt
+ ${EXECUTABLE_OUTPUT_PATH})
+ execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt
+ ${EXECUTABLE_OUTPUT_PATH})
+ endif()
+
+ endif() # if(NOT LIBOAUTH_LIBRARY) ... else ...
+endif() # if (NOT PKG_CONFIG_FOUND) ... else ...
http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/insert_consumer.cc b/src/kudu/twitter-demo/insert_consumer.cc
index 2371f7a..361b8bd 100644
--- a/src/kudu/twitter-demo/insert_consumer.cc
+++ b/src/kudu/twitter-demo/insert_consumer.cc
@@ -17,20 +17,21 @@
#include "kudu/twitter-demo/insert_consumer.h"
-#include <glog/logging.h>
+#include <ctime>
#include <mutex>
#include <string>
-#include <time.h>
#include <vector>
+#include <glog/logging.h>
+
+#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/row.h"
#include "kudu/common/schema.h"
-#include "kudu/client/client.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/stl_util.h"
-#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/twitter-demo/parser.h"
#include "kudu/twitter-demo/twitter-schema.h"
#include "kudu/util/status.h"
@@ -38,38 +39,19 @@
namespace kudu {
namespace twitter_demo {
-using tserver::TabletServerServiceProxy;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-using rpc::RpcController;
using kudu::client::KuduInsert;
using kudu::client::KuduClient;
using kudu::client::KuduSession;
-using kudu::client::KuduStatusCallback;
-using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
-FlushCB::FlushCB(InsertConsumer* consumer)
- : consumer_(consumer) {
-}
-
-FlushCB::~FlushCB() {
-}
-
-void FlushCB::Run(const Status& status) {
- consumer_->BatchFinished(status);
-}
-
-InsertConsumer::InsertConsumer(const client::sp::shared_ptr<KuduClient> &client)
+InsertConsumer::InsertConsumer(client::sp::shared_ptr<KuduClient> client)
: initted_(false),
schema_(CreateTwitterSchema()),
- flush_cb_(this),
- client_(client),
- request_pending_(false) {
+ client_(std::move(client)) {
}
Status InsertConsumer::Init() {
- const char *kTableName = "twitter";
+ static const string kTableName = "twitter";
Status s = client_->OpenTable(kTableName, &table_);
if (s.IsNotFound()) {
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
@@ -83,21 +65,13 @@ Status InsertConsumer::Init() {
session_ = client_->NewSession();
session_->SetTimeoutMillis(1000);
- CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ RETURN_NOT_OK(session_->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
initted_ = true;
return Status::OK();
}
InsertConsumer::~InsertConsumer() {
- // TODO: to be safe, we probably need to cancel any current RPC,
- // or else the callback will get called on the destroyed object.
- // Given this is just demo code, cutting this corner.
- CHECK(!request_pending_);
-}
-
-void InsertConsumer::BatchFinished(const Status& s) {
- std::lock_guard<simple_spinlock> l(lock_);
- request_pending_ = false;
+ Status s(session_->Flush());
if (!s.ok()) {
bool overflow;
vector<client::KuduError*> errors;
@@ -140,21 +114,6 @@ void InsertConsumer::ConsumeJSON(const Slice& json_slice) {
CHECK_OK(r->SetInt32("user_friends_count", event_.tweet_event.user_friends_count));
CHECK_OK(r->SetStringCopy("user_image_url", event_.tweet_event.user_image_url));
CHECK_OK(session_->Apply(ins.release()));
-
- // TODO: once the auto-flush mode is implemented, switch to using that
- // instead of the manual batching here
- bool do_flush = false;
- {
- std::lock_guard<simple_spinlock> l(lock_);
- if (!request_pending_) {
- request_pending_ = true;
- do_flush = true;
- }
- }
- if (do_flush) {
- VLOG(1) << "Sending batch of " << session_->CountBufferedOperations();
- session_->FlushAsync(&flush_cb_);
- }
}
} // namespace twitter_demo
http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.h
----------------------------------------------------------------------
diff --git a/src/kudu/twitter-demo/insert_consumer.h b/src/kudu/twitter-demo/insert_consumer.h
index 826ca11..00cdce4 100644
--- a/src/kudu/twitter-demo/insert_consumer.h
+++ b/src/kudu/twitter-demo/insert_consumer.h
@@ -40,40 +40,21 @@ class KuduStatusCallback;
namespace twitter_demo {
-class InsertConsumer;
-
-class FlushCB : public client::KuduStatusCallback {
- public:
- explicit FlushCB(InsertConsumer* consumer);
-
- virtual ~FlushCB();
-
- virtual void Run(const Status& status) OVERRIDE;
- private:
- InsertConsumer* consumer_;
-};
-
// Consumer of tweet data which parses the JSON and inserts
// into a remote tablet via RPC.
class InsertConsumer : public TwitterConsumer {
public:
- explicit InsertConsumer(
- const client::sp::shared_ptr<client::KuduClient> &client);
+ explicit InsertConsumer(client::sp::shared_ptr<client::KuduClient> client);
~InsertConsumer();
Status Init();
- virtual void ConsumeJSON(const Slice& json) OVERRIDE;
+ virtual void ConsumeJSON(const Slice& json_slice) OVERRIDE;
private:
- friend class FlushCB;
-
- void BatchFinished(const Status& s);
-
bool initted_;
client::KuduSchema schema_;
- FlushCB flush_cb_;
TwitterEventParser parser_;
// Reusable object for latest event.
@@ -82,9 +63,6 @@ class InsertConsumer : public TwitterConsumer {
client::sp::shared_ptr<client::KuduClient> client_;
client::sp::shared_ptr<client::KuduSession> session_;
client::sp::shared_ptr<client::KuduTable> table_;
-
- simple_spinlock lock_;
- bool request_pending_;
};
} // namespace twitter_demo