You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/02/11 12:21:02 UTC

[GitHub] arpadboda commented on a change in pull request #482: Minificpp 658 - Port Raw Site to Site to C

arpadboda commented on a change in pull request #482: Minificpp 658 - Port Raw Site to Site to C
URL: https://github.com/apache/nifi-minifi-cpp/pull/482#discussion_r255482815
 
 

 ##########
 File path: nanofi/src/sitetosite/CRawSocketProtocol.c
 ##########
 @@ -0,0 +1,1016 @@
+/**
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "uthash.h"
+#include "sitetosite/CRawSocketProtocol.h"
+#include "sitetosite/CPeer.h"
+
+#include "core/cstream.h"
+
+#include "api/nanofi.h"
+
+static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
+/**
+ * Boolean value indicating whether or not the contents of a FlowFile should
+ * be GZipped when transferred.
+ */
+"GZIP",
+/**
+ * The unique identifier of the port to communicate with
+ */
+"PORT_IDENTIFIER",
+/**
+ * Indicates the number of milliseconds after the request was made that the
+ * client will wait for a response. If no response has been received by the
+ * time this value expires, the server can move on without attempting to
+ * service the request because the client will have already disconnected.
+ */
+"REQUEST_EXPIRATION_MILLIS",
+/**
+ * The preferred number of FlowFiles that the server should send to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol.
+ */
+"BATCH_COUNT",
+/**
+ * The preferred number of bytes that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the
+ * protocol.
+ */
+"BATCH_SIZE",
+/**
+ * The preferred amount of time that the server should send data to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol. Value is in milliseconds.
+ */
+"BATCH_DURATION" };
+
+typedef struct {
+  const char * name;
+  char value[40];
+  UT_hash_handle hh;
+} PropertyValue;
+
+int handShake(struct CRawSiteToSiteClient * client) {
+  if (client->peer_state_ != ESTABLISHED) {
+    //client->logger_->log_error("Site2Site peer state is not established while handshake");
+    return -1;
+  }
+  //client->logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", client->port_id_str_);
+
+  CIDGenerator gen;
+  gen.implementation_ = CUUID_DEFAULT_IMPL;
+  generate_uuid(&gen, client->_commsIdentifier);
+  client->_commsIdentifier[36]='\0';
+
+  int ret = writeUTF(client->_commsIdentifier, strlen(client->_commsIdentifier), False, client->peer_->stream_);
+
+  if (ret <= 0) {
+    return -1;
+  }
+
+  uint32_t prop_size;
+  PropertyValue *current, *tmp, * properties = NULL;
+
+  current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+  current->name = HandShakePropertyStr[GZIP];
+  strncpy(current->value, "false", strlen("false"));
+
+  HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+  current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+  current->name = HandShakePropertyStr[PORT_IDENTIFIER];
+  strncpy(current->value, client->port_id_str_, strlen(client->port_id_str_) +1);
+
+  HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+  current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+  current->name = HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS];
+  sprintf(current->value, "%llu", client->_timeOut);
+
+  HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+  prop_size = 3;
+
+  if (client->_currentVersion >= 5) {
+    if (client->_batchCount > 0) {
+      current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+      current->name = HandShakePropertyStr[BATCH_COUNT];
+      sprintf(current->value, "%llu", client->_batchCount);
+
+      HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+      prop_size++;
+    }
+    if (client->_batchSize > 0) {
+      current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+      current->name = HandShakePropertyStr[BATCH_SIZE];
+      sprintf(current->value, "%llu", client->_batchSize);
+
+      HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+      prop_size++;
+    }
+    if (client->_batchDuration > 0) {
+      current = (PropertyValue *)malloc(sizeof(PropertyValue));
+
+      current->name = HandShakePropertyStr[BATCH_DURATION];
+      sprintf(current->value, "%llu", client->_batchDuration);
+
+      HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current);
+
+      prop_size++;
+    }
+  }
+
+  if (client->_currentVersion >= 3) {
+
+    //ret = client->peer_->writeUTF(client->peer_->getURL());
+    const char * urlstr = getURL(client->peer_);
+    ret = writeUTF(urlstr, strlen(urlstr), False, client->peer_->stream_);
+    if (ret <= 0) {
+      return -1;
+    }
+  }
+
+  ret = write_uint32_t(prop_size, client->peer_->stream_);
+  if (ret <= 0) {
+    return -1;
+  }
+
+  HASH_ITER(hh, properties, current, tmp) {
+    if(writeUTF(current->name, strlen(current->name), False, client->peer_->stream_) <= 0) {
+      return -1;
+    }
+    if(writeUTF(current->value, strlen(current->value), False, client->peer_->stream_) <= 0) {
+      return -1;
+    }
+    //client->logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", current->name, current->value);
+    HASH_DEL(properties, current);
+    free(current);
+  }
+
+  RespondCode code;
+
+  ret = readResponse(client, &code);
+
+  if (ret <= 0) {
+    return -1;
+  }
+
+  RespondCodeContext *resCode = getRespondCodeContext(code);
+
+  if (resCode->hasDescription) {
+    uint32_t utflen;
+    ret = readUTFLen(&utflen, client->peer_->stream_);
+    if (ret <= 0)
+      return -1;
+
+    memset(client->description_buffer, 0, utflen+1);
+    ret = readUTF(client->description_buffer, utflen, client->peer_->stream_);
+    if (ret <= 0)
+      return -1;
+  }
+
+  const char * error = "";
+
+  switch (code) {
+    case PROPERTIES_OK:
+      //client->logger_->log_debug("Site2Site HandShake Completed");
+      client->peer_state_ = HANDSHAKED;
+      return 0;
+    case PORT_NOT_IN_VALID_STATE:
+      error = "in invalid state";
+      break;
+    case UNKNOWN_PORT:
+      error = "an unknown port";
+      break;
+    case PORTS_DESTINATION_FULL:
+      error = "full";
+      break;
+    // Unknown error
+    default:
+      //client->logger_->log_error("HandShake Failed because of unknown respond code %d", code);
+      return -1;
+  }
+
+  // All known error cases handled here
+  //client->logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", client->port_id_str_, error);
+  return -2;
+}
+
+
+/*bool CRawSiteToSiteClient::getPeerList(std::vector<CPeerStatus> &peers) {
 
 Review comment:
   Left this commented here as it's planned to be added as a follow-up. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services