You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/19 16:13:59 UTC

[ambari-logsearch] 15/28: AMBARI-24825. Log Feeder: Fix HDFS/S3 outputs (#13)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git

commit c3a750a73e9f34f99494dad581563c2ed4307a15
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Thu Oct 25 15:36:31 2018 +0200

    AMBARI-24825. Log Feeder: Fix HDFS/S3 outputs (#13)
---
 .../apache/ambari/logfeeder/output/OutputFile.java |   2 +-
 .../ambari/logfeeder/output/OutputHDFSFile.java    |   7 ++
 .../ambari/logfeeder/output/OutputS3File.java      |  17 ++-
 .../ambari/logfeeder/output/spool/LogSpooler.java  |   5 +-
 .../src/main/scripts/logfeeder.sh                  |   2 +-
 .../src/main/scripts/logsearch.sh                  |   2 +-
 docker/cloud-docker-compose.yml                    | 129 +++++++++++++++++++++
 .../logfeeder/shipper-conf/output.config.json      |  38 ++++++
 8 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 7dae1b8..7495444 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -98,7 +98,7 @@ public class OutputFile extends Output<LogFeederProps, InputFileMarker> {
     String outStr = null;
     CSVPrinter csvPrinter = null;
     try {
-      if (codec.equals("csv")) {
+      if ("csv".equals(codec)) {
         csvPrinter = new CSVPrinter(outWriter, CSVFormat.RFC4180);
         //TODO:
       } else {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index ed93aa4..93a2643 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -39,6 +39,7 @@ import java.io.File;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -114,6 +115,12 @@ public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> impl
     }
   }
 
+  @Override
+  public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
+    String block = LogFeederUtil.getGson().toJson(jsonObj);
+    write(block, inputMarker);
+  }
+
   
   @Override
   public String getShortDescription() {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index 7d7e6af..a2f6b08 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -192,12 +192,22 @@ public class OutputS3File extends OutputFile implements RolloverCondition, Rollo
    */
   @Override
   public void write(String block, InputFileMarker inputMarker) {
+    createLogSpoolerIfRequired(inputMarker);
+    logSpooler.add(block);
+  }
+
+  @Override
+  public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
+    String block = LogFeederUtil.getGson().toJson(jsonObj);
+    write(block, inputMarker);
+  }
+
+  private void createLogSpoolerIfRequired(InputFileMarker inputMarker) {
     if (logSpooler == null) {
       if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
         InputFile input = (InputFile) inputMarker.getInput();
         logSpooler = createSpooler(input.getFilePath());
         s3Uploader = createUploader(input.getInputDescriptor().getType());
-        logSpooler.add(block);
       } else {
         logger.error("Cannot write from non local file...");
       }
@@ -261,4 +271,9 @@ public class OutputS3File extends OutputFile implements RolloverCondition, Rollo
   public void handleRollover(File rolloverFile) {
     s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
   }
+
+  @Override
+  public String getShortDescription() {
+    return "output:destination=s3,bucket=" + s3OutputConfiguration.getS3BucketName();
+  }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index 7d7d111..82a3f1b 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -45,8 +45,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class LogSpooler {
   
   private static final Logger logger = LogManager.getLogger(LogSpooler.class);
-  public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
-  static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+
+  private static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+  private static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
 
   private String spoolDirectory;
   private String sourceFileNamePrefix;
diff --git a/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh b/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
index e125768..eaf00e1 100755
--- a/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
+++ b/ambari-logsearch-logfeeder/src/main/scripts/logfeeder.sh
@@ -168,7 +168,7 @@ function start() {
   LOGFEEDER_DEBUG_PORT=${LOGFEEDER_DEBUG_PORT:-"5006"}
 
   if [ "$LOGFEEDER_DEBUG" = "true" ]; then
-    LOGFEEDER_JAVA_OPTS="$LOGFEEDER_JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=$LOGFEEDER_DEBUG_PORT,server=y,suspend=$LOGFEEDER_DEBUG_SUSPEND "
+    LOGFEEDER_JAVA_OPTS="$LOGFEEDER_JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=*:$LOGFEEDER_DEBUG_PORT,server=y,suspend=$LOGFEEDER_DEBUG_SUSPEND "
   fi
 
   if [ "$LOGFEEDER_SSL" = "true" ]; then
diff --git a/ambari-logsearch-server/src/main/scripts/logsearch.sh b/ambari-logsearch-server/src/main/scripts/logsearch.sh
index b941e25..1b64832 100755
--- a/ambari-logsearch-server/src/main/scripts/logsearch.sh
+++ b/ambari-logsearch-server/src/main/scripts/logsearch.sh
@@ -149,7 +149,7 @@ function start() {
   LOGSEARCH_DEBUG_PORT=${LOGSEARCH_DEBUG_PORT:-"5005"}
 
   if [ "$LOGSEARCH_DEBUG" = "true" ]; then
-    LOGSEARCH_JAVA_OPTS="$LOGSEARCH_JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=$LOGSEARCH_DEBUG_PORT,server=y,suspend=$LOGSEARCH_DEBUG_SUSPEND "
+    LOGSEARCH_JAVA_OPTS="$LOGSEARCH_JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=*:$LOGSEARCH_DEBUG_PORT,server=y,suspend=$LOGSEARCH_DEBUG_SUSPEND "
   fi
 
   if [ "$LOGSEARCH_SSL" = "true" ]; then
diff --git a/docker/cloud-docker-compose.yml b/docker/cloud-docker-compose.yml
new file mode 100644
index 0000000..3a9ec05
--- /dev/null
+++ b/docker/cloud-docker-compose.yml
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+version: '3.3'
+services:
+  zookeeper:
+    image: zookeeper:${ZOOKEEPER_VERSION:-3.4.10}
+    restart: always
+    hostname: zookeeper
+    networks:
+      - logsearch-network
+    ports:
+      - 2181:2181
+    environment:
+      ZOO_MY_ID: 1
+      ZOO_SERVERS: server.1=zookeeper:2888:3888
+  solr:
+    image: solr:${SOLR_VERSION:-7.5.0}
+    restart: always
+    hostname: solr
+    ports:
+      - "8983:8983"
+    networks:
+      - logsearch-network
+    env_file:
+      - Profile
+    entrypoint:
+      - docker-entrypoint.sh
+      - solr
+      - start
+      - "-f"
+      - "-c"
+      - "-z"
+      - ${ZOOKEEPER_CONNECTION_STRING}
+  logsearch:
+    image: ambari-logsearch:v1.0
+    restart: always
+    hostname: logsearch.apache.org
+    labels:
+      logfeeder.log.type: "logsearch_server"
+    networks:
+      - logsearch-network
+    env_file:
+      - Profile
+    ports:
+      - 61888:61888
+      - 4444:4444
+      - 5005:5005
+    environment:
+      COMPONENT: logsearch
+      COMPONENT_LOG: logsearch
+      ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING}
+      DISPLAY: $DISPLAY_MAC
+    volumes:
+      - $AMBARI_LOCATION:/root/ambari
+      - $AMBARI_LOCATION/ambari-logsearch/docker/test-logs:/root/test-logs
+      - $AMBARI_LOCATION/ambari-logsearch/docker/test-config:/root/test-config
+  logfeeder:
+    image: ambari-logsearch:v1.0
+    restart: always
+    hostname: logfeeder.apache.org
+    privileged: true
+    labels:
+      logfeeder.log.type: "logfeeder"
+    networks:
+      - logsearch-network
+    env_file:
+      - Profile
+    ports:
+      - 5006:5006
+    environment:
+      COMPONENT: logfeeder
+      COMPONENT_LOG: logfeeder
+      ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING}
+    volumes:
+      - $AMBARI_LOCATION:/root/ambari
+      - $AMBARI_LOCATION/ambari-logsearch/docker/test-logs:/root/test-logs
+      - $AMBARI_LOCATION/ambari-logsearch/docker/test-config:/root/test-config
+      - /var/run/docker.sock:/var/run/docker.sock
+      - /usr/local/bin/docker:/usr/local/bin/docker
+      - /var/lib/docker:/var/lib/docker
+  fakes3:
+    image: localstack/localstack
+    hostname: fakes3
+    ports:
+      - "4569:4569"
+    environment:
+      - SERVICES=s3:4569
+      - DEBUG=s3
+    networks:
+      logsearch-network:
+        aliases:
+          - testbucket.fakes3
+    env_file:
+      - Profile
+  namenode:
+    image: flokkr/hadoop-hdfs-namenode:${HADOOP_VERSION:-3.0.0}
+    hostname: namenode
+    ports:
+      - 9870:9870
+      - 9000:9000
+    env_file:
+      - Profile
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-hdfs/dfs/name"
+    networks:
+      - logsearch-network
+  datanode:
+    image: flokkr/hadoop-hdfs-datanode:${HADOOP_VERSION:-3.0.0}
+    links:
+      - namenode
+    env_file:
+      - Profile
+    networks:
+      - logsearch-network
+networks:
+   logsearch-network:
+      driver: bridge
diff --git a/docker/test-config/logfeeder/shipper-conf/output.config.json b/docker/test-config/logfeeder/shipper-conf/output.config.json
index a85b4a4..62950d1 100644
--- a/docker/test-config/logfeeder/shipper-conf/output.config.json
+++ b/docker/test-config/logfeeder/shipper-conf/output.config.json
@@ -31,6 +31,44 @@
           ]
         }
       }
+    },
+    {
+      "comment": "S3 file output",
+      "is_enabled": "true",
+      "destination": "s3_file",
+      "type": "s3",
+      "s3_access_key" : "accessKey",
+      "s3_secret_key" : "secretKey",
+      "s3_bucket" : "docker-logsearch",
+      "s3_endpoint" : "http://fakes3:4569",
+      "s3_log_dir" : "/tmp",
+      "skip_logtime": "true",
+      "conditions": {
+        "fields": {
+          "rowtype": [
+            "s3"
+          ]
+        }
+      }
+    },
+    {
+      "comment": "HDFS file output",
+      "is_enabled": "true",
+      "destination": "hdfs",
+      "type": "hdfs",
+      "file_name_prefix":"service-logs-",
+      "hdfs_out_dir": "/logfeeder/$HOST/service",
+      "hdfs_host": "namenode",
+      "hdfs_port": "9000",
+      "rollover_sec":"10",
+      "skip_logtime": "true",
+      "conditions": {
+        "fields": {
+          "rowtype": [
+            "hdfs"
+          ]
+        }
+      }
     }
   ]
 }