You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by wa...@apache.org on 2022/09/06 05:57:30 UTC

[incubator-kvrocks] branch unstable updated: Allows Kvrocks to listen to only the unix socket (#809)

This is an automated email from the ASF dual-hosted git repository.

wangyuan pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new dbd968b  Allows Kvrocks to listen to only the unix socket (#809)
dbd968b is described below

commit dbd968b47a08184b3a0b6492f28f9ba8edf6d11b
Author: Yaroslav <to...@gmail.com>
AuthorDate: Tue Sep 6 08:57:23 2022 +0300

    Allows Kvrocks to listen to only the unix socket (#809)
    
    If unix socket is specified, don't listen default TCP if addr:port wasn't explicitly set.
---
 kvrocks.conf      |  7 +++----
 src/config.cc     | 21 +++++++++++++++------
 src/config.h      |  4 +++-
 src/config_type.h |  2 +-
 src/main.cc       |  4 ++--
 src/server.cc     |  3 ++-
 src/worker.cc     |  3 ++-
 7 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index d472ee4..2f5c775 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -1,9 +1,8 @@
 ################################ GENERAL #####################################
 
-# By default kvrocks listens for connections from all the network interfaces
-# available on the server. It is possible to listen to just one or multiple
-# interfaces using the "bind" configuration directive, followed by one or
-# more IP addresses.
+# By default kvrocks listens for connections from localhost interface.
+# It is possible to listen to just one or multiple interfaces using 
+# the "bind" configuration directive, followed by one or more IP addresses.
 #
 # Examples:
 #
diff --git a/src/config.cc b/src/config.cc
index 2dae8aa..efa5779 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -44,6 +44,8 @@ const char *errNotEnableBlobDB = "Must set rocksdb.enable_blob_files to yes firs
 const char *errNotSetLevelCompactionDynamicLevelBytes =
             "Must set rocksdb.level_compaction_dynamic_level_bytes yes first.";
 
+const char *kDefaultBindAddress = "127.0.0.1";
+
 configEnum compression_type_enum[] = {
     {"no", rocksdb::CompressionType::kNoCompression},
     {"snappy", rocksdb::CompressionType::kSnappyCompression},
@@ -59,8 +61,6 @@ configEnum supervised_mode_enum[] = {
     {nullptr, 0}
 };
 
-ConfigField::~ConfigField() = default;
-
 std::string trimRocksDBPrefix(std::string s) {
   if (strncasecmp(s.data(), "rocksdb.", 8)) return s;
   return s.substr(8, s.size()-8);
@@ -88,14 +88,13 @@ Config::Config() {
     bool readonly;
     std::unique_ptr<ConfigField> field;
 
-    FieldWrapper(std::string name, bool readonly,
-                 ConfigField* field)
+    FieldWrapper(std::string name, bool readonly, ConfigField *field)
         : name(std::move(name)), readonly(readonly), field(field) {}
   };
   FieldWrapper fields[] = {
       {"daemonize", true, new YesNoField(&daemonize, false)},
-      {"bind", true, new StringField(&binds_, "127.0.0.1")},
-      {"port", true, new IntField(&port, 6666, 1, 65535)},
+      {"bind", true, new StringField(&binds_, "")},
+      {"port", true, new IntField(&port, kDefaultPort, 1, 65535)},
       {"workers", true, new IntField(&workers, 8, 1, 256)},
       {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)},
       {"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)},
@@ -539,6 +538,16 @@ Status Config::finish() {
   if ((cluster_enabled) && !tokens.empty()) {
     return Status(Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists");
   }
+  if (unixsocket.empty() && binds.size() == 0) {
+    binds.emplace_back(kDefaultBindAddress);
+  }
+  if (cluster_enabled && binds.size() == 0) {
+    return Status(Status::NotOK, "node is in cluster mode, but TCP listen address "
+                                 "wasn't specified via configuration file");
+  }
+  if (master_port != 0 && binds.size() == 0) {
+    return Status(Status::NotOK, "replication doesn't supports unix socket");
+  }
   if (db_dir.empty()) db_dir = dir + "/db";
   if (backup_dir.empty()) backup_dir = dir + "/backup";
   if (log_dir.empty()) log_dir = dir;
diff --git a/src/config.h b/src/config.h
index 793d2ce..8d6e2bd 100644
--- a/src/config.h
+++ b/src/config.h
@@ -47,6 +47,7 @@ class Storage;
 const size_t KiB = 1024L;
 const size_t MiB = 1024L * KiB;
 const size_t GiB = 1024L * MiB;
+const int kDefaultPort = 6666;
 
 extern const char *kDefaultNamespace;
 
@@ -64,7 +65,8 @@ struct Config{
  public:
   Config();
   ~Config() = default;
-  int port = 6666;
+
+  int port = 0;
   int workers = 0;
   int timeout = 0;
   int loglevel = 0;
diff --git a/src/config_type.h b/src/config_type.h
index 6cf811e..6fa070b 100644
--- a/src/config_type.h
+++ b/src/config_type.h
@@ -42,7 +42,7 @@ const char *configEnumGetName(configEnum *ce, int val);
 class ConfigField {
  public:
   ConfigField() = default;
-  virtual ~ConfigField() = 0;
+  virtual ~ConfigField() = default;
   virtual std::string ToString() = 0;
   virtual Status Set(const std::string &v) = 0;
   virtual Status ToNumber(int64_t *n) { return Status(Status::NotOK, "not supported"); }
diff --git a/src/main.cc b/src/main.cc
index a61e2fa..348036c 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -285,11 +285,11 @@ int main(int argc, char* argv[]) {
     exit(1);
   }
   initGoogleLog(&config);
-  LOG(INFO)<< "Version: " << VERSION << " @" << GIT_COMMIT << std::endl;
+  LOG(INFO) << "Version: " << VERSION << " @" << GIT_COMMIT << std::endl;
   // Tricky: We don't expect that different instances running on the same port,
   // but the server use REUSE_PORT to support the multi listeners. So we connect
   // the listen port to check if the port has already listened or not.
-  if (Util::IsPortInUse(config.port)) {
+  if (config.binds.size() != 0 && Util::IsPortInUse(config.port)) {
     LOG(ERROR)<< "Could not create server TCP since the specified port["
               << config.port << "] is already in use" << std::endl;
     exit(1);
diff --git a/src/server.cc b/src/server.cc
index 607f2cd..3898f1c 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -60,10 +60,11 @@ Server::Server(Engine::Storage *storage, Config *config) :
     if (!config->unixsocket.empty() && i == 0) {
       Status s = worker->ListenUnixSocket(config->unixsocket, config->unixsocketperm, config->backlog);
       if (!s.IsOK()) {
-        LOG(ERROR) << "[server] Failed to listen on unix socket: "<< config->unixsocket
+        LOG(ERROR) << "[server] Failed to listen on unix socket: " << config->unixsocket
                    << ", encounter error: " << s.Msg();
         exit(1);
       }
+      LOG(INFO) << "[server] Listening on unix socket: " << config->unixsocket;
     }
     worker_threads_.emplace_back(Util::MakeUnique<WorkerThread>(std::move(worker)));
   }
diff --git a/src/worker.cc b/src/worker.cc
index 5a64167..8fafcd4 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -50,10 +50,11 @@ Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr) {
   for (const auto &bind : binds) {
     s = listenTCP(bind, port, config->backlog);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[worker] Failed to listen on: "<< bind << ":" << port
+      LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << port
                  << ", encounter error: " << s.Msg();
       exit(1);
     }
+    LOG(INFO) << "[worker] Listening on: " << bind << ":" << port;
   }
 }