You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/07/16 16:10:15 UTC
[ambari] branch trunk updated: AMBARI-24183. Log Feeder: read and
ship docker container logs. (#1763)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new c046095 AMBARI-24183. Log Feeder: read and ship docker container logs. (#1763)
c046095 is described below
commit c0460952992610d6f38a86828f44a9aaf9a5a222
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Jul 16 18:10:10 2018 +0200
AMBARI-24183. Log Feeder: read and ship docker container logs. (#1763)
---
.../api/model/inputconfig/InputFileDescriptor.java | 2 +
.../inputconfig/impl/InputFileDescriptorImpl.java | 20 +++
.../pom.xml | 83 +++++++++++
.../ambari/logfeeder/ContainerMetadata.java} | 42 +++++-
.../ambari/logfeeder/ContainerRegistry.java} | 26 +++-
.../logfeeder/docker/DockerContainerRegistry.java | 145 ++++++++++++++++++
.../docker/DockerContainerRegistryMonitor.java | 50 +++++++
.../ambari/logfeeder/docker/DockerMetadata.java | 81 ++++++++++
.../docker/command/CommandExecutionHelper.java | 52 +++++++
.../logfeeder/docker/command/CommandResponse.java} | 34 ++++-
.../docker/command/ContainerCommand.java} | 25 ++--
.../command/DockerInspectContainerCommand.java | 61 ++++++++
.../docker/command/DockerListContainerCommand.java | 54 +++++++
.../src/main/resources/log4j.properties} | 23 +--
.../ambari/logfeeder/plugin/input/Input.java | 9 ++
.../ambari-logsearch-logfeeder/pom.xml | 5 +
.../ambari/logfeeder/common/ConfigHandler.java | 1 +
.../logfeeder/common/LogFeederConstants.java | 3 +
.../ambari/logfeeder/conf/ApplicationConfig.java | 14 ++
.../ambari/logfeeder/conf/LogFeederProps.java | 18 +++
.../ambari/logfeeder/filter/DockerLogFilter.java} | 20 ++-
.../apache/ambari/logfeeder/filter/FilterGrok.java | 14 ++
.../apache/ambari/logfeeder/input/InputFile.java | 166 +++++++++++++++++----
.../ambari/logfeeder/input/InputManagerImpl.java | 17 +++
.../input/monitor/DockerLogFileUpdateMonitor.java | 101 +++++++++++++
.../logsearch/model/common/LSServerInputFile.java | 12 ++
ambari-logsearch/docker/docker-compose.yml | 8 +
.../test-config/logfeeder/logfeeder.properties | 3 +-
.../input.config-logsearch-docker.json | 31 ++++
ambari-logsearch/pom.xml | 1 +
30 files changed, 1029 insertions(+), 92 deletions(-)
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
index b58db6a..2689f82 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
@@ -27,4 +27,6 @@ public interface InputFileDescriptor extends InputFileBaseDescriptor {
Integer getPathUpdateIntervalMin();
Integer getMaxAgeMin();
+
+ Boolean getDockerEnabled();
}
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
index 99b42fe..2ba53e6 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -70,6 +70,17 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
@SerializedName("max_age_min")
private Integer maxAgeMin;
+ @ShipperConfigElementDescription(
+ path = "/input/[]/docker",
+ type = "boolean",
+ description = "Input comes from a docker container.",
+ examples = {"true", "false"},
+ defaultValue = "false"
+ )
+ @Expose
+ @SerializedName("docker")
+ private Boolean dockerEnabled;
+
@Override
public Integer getDetachIntervalMin() {
return this.detachIntervalMin;
@@ -90,6 +101,11 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
return this.maxAgeMin;
}
+ @Override
+ public Boolean getDockerEnabled() {
+ return dockerEnabled;
+ }
+
public void setDetachIntervalMin(Integer detachIntervalMin) {
this.detachIntervalMin = detachIntervalMin;
}
@@ -105,4 +121,8 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
public void setMaxAgeMin(Integer maxAgeMin) {
this.maxAgeMin = maxAgeMin;
}
+
+ public void setDockerEnabled(Boolean dockerEnabled) {
+ this.dockerEnabled = dockerEnabled;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml
new file mode 100644
index 0000000..66333f5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-logsearch</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.9.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>${jdk.version}</source>
+ <target>${jdk.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
similarity index 54%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
index b58db6a..df3a80a 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+/**
+ * Holds container related metadata
+ **/
+public interface ContainerMetadata {
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
- Integer getDetachIntervalMin();
+ /**
+ * Id of the container, used for getting the right log path
+ * @return container id
+ */
+ String getId();
- Integer getDetachTimeMin();
+ /**
+ * Name of the container
+ * @return container name
+ */
+ String getName();
- Integer getPathUpdateIntervalMin();
+ /**
+ * Hostname of the container, can be container host itself or the actual hostname
+ * @return container host name
+ */
+ String getHostName();
+
+ /**
+ * Log label
+ * @return log type label
+ */
+ String getLogTypeLabel();
+
+ /**
+ * Log path of the container (should be json file)
+ * @return log path
+ */
+ String getLogPath();
- Integer getMaxAgeMin();
}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
similarity index 58%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
index b58db6a..94f6a82 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.Map;
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
- Integer getDetachIntervalMin();
+/**
+ * Responsible of register or drop new / existing containers.
+ * @param <METADATA_TYPE> type of metadata - could be docker or other container implementation
+ */
+public interface ContainerRegistry<METADATA_TYPE extends ContainerMetadata> {
- Integer getDetachTimeMin();
+ /**
+ * Register process of running containers
+ */
+ void register();
- Integer getPathUpdateIntervalMin();
+ /**
+ * Holds container metadata per log component type and container id.
+ * @return container metadata
+ */
+ Map<String, Map<String, METADATA_TYPE>> getContainerMetadataMap();
- Integer getMaxAgeMin();
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java
new file mode 100644
index 0000000..c3e816e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.apache.ambari.logfeeder.ContainerRegistry;
+import org.apache.ambari.logfeeder.docker.command.DockerInspectContainerCommand;
+import org.apache.ambari.logfeeder.docker.command.DockerListContainerCommand;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Register docker metadata from docker containers on the host (with listing containers and inspecting them)
+ */
+public final class DockerContainerRegistry implements ContainerRegistry<DockerMetadata> {
+
+ private static final String LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL = "logfeeder.container.registry.docker.interval";
+ private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistry.class);
+
+ private static DockerContainerRegistry INSTANCE = null;
+ private final Properties configs;
+ private Map<String, Map<String, DockerMetadata>> dockerMetadataMap = new ConcurrentHashMap<>();
+ private int waitIntervalMin = 5;
+
+ private DockerContainerRegistry(Properties configs) {
+ this.configs = configs;
+ init(configs);
+ }
+
+ @Override
+ public synchronized void register() {
+ Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = renewMetadata();
+ if (!actualDockerMetadataMap.isEmpty()) {
+ dockerMetadataMap.putAll(actualDockerMetadataMap);
+ dockerMetadataMap = dockerMetadataMap
+ .entrySet()
+ .stream()
+ .filter(e -> actualDockerMetadataMap.keySet().contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ for (Map.Entry<String, Map<String, DockerMetadata>> entry : dockerMetadataMap.entrySet()) {
+ for (Map.Entry<String, DockerMetadata> metadataEntry : entry.getValue().entrySet()) {
+ logger.debug("Found container metadata: {}", entry.getValue().toString());
+ }
+ }
+ }
+ }
+
+ private Map<String, Map<String, DockerMetadata>> renewMetadata() {
+ final Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = new HashMap<>();
+ final List<String> containerIds = new DockerListContainerCommand().execute(null);
+ final Map<String, String> params = new HashMap<>();
+
+ params.put("containerIds", StringUtils.join(containerIds, ","));
+ List<Map<String, Object>> containerDataList = new DockerInspectContainerCommand().execute(params);
+
+ for (Map<String, Object> containerDataMap : containerDataList) {
+ String id = containerDataMap.get("Id").toString();
+ String name = containerDataMap.get("Name").toString();
+ String logPath = containerDataMap.get("LogPath").toString();
+ Map<String, Object> dockerConfigMap = (HashMap<String, Object>) containerDataMap.get("Config");
+ String hostname = dockerConfigMap.get("Hostname").toString();
+ Map<String, String> labels = (Map<String, String>) dockerConfigMap.get("Labels");
+ Map<String, Object> stateMap = (HashMap<String, Object>) containerDataMap.get("State");
+ String componentType = labels.get("logfeeder.log.type");
+ boolean running = (Boolean) stateMap.get("Running");
+ long timestamp = running ? convertDateStrToLong((String)stateMap.get("StartedAt")) : convertDateStrToLong((String)stateMap.get("FinishedAt"));
+
+ if (componentType != null) {
+ if (actualDockerMetadataMap.containsKey(componentType)) {
+ Map<String, DockerMetadata> componentMetadataMap = actualDockerMetadataMap.get(componentType);
+ componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp));
+ actualDockerMetadataMap.put(componentType, componentMetadataMap);
+ } else {
+ Map<String, DockerMetadata> componentMetadataMap = new HashMap<>();
+ componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp));
+ actualDockerMetadataMap.put(componentType, componentMetadataMap);
+ }
+ } else {
+ logger.debug("Ignoring docker metadata from registry as container (id: {}, name: {}) as it has no 'logfeeder.log.type' label", id, name);
+ }
+ }
+
+ return actualDockerMetadataMap;
+ }
+
+ @Override
+ public synchronized Map<String, Map<String, DockerMetadata>> getContainerMetadataMap() {
+ return dockerMetadataMap;
+ }
+
+ public void init(Properties configs) {
+ // init docker related data
+ String waitStr = configs.getProperty(LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL, "5");
+ setWaitIntervalMin(Integer.parseInt(waitStr));
+ // TODO: add docker authentication settings through this
+ }
+
+ public static synchronized DockerContainerRegistry getInstance(Properties dockerConfig) {
+ if (INSTANCE == null) {
+ return new DockerContainerRegistry(dockerConfig);
+ } else {
+ return INSTANCE;
+ }
+ }
+
+ public int getWaitIntervalMin() {
+ return waitIntervalMin;
+ }
+
+ public void setWaitIntervalMin(int waitIntervalMin) {
+ this.waitIntervalMin = waitIntervalMin;
+ }
+
+ private long convertDateStrToLong(String timestampStr) {
+ LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, DateTimeFormatter.ISO_DATE_TIME);
+ return localDateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli();
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java
new file mode 100644
index 0000000..30c328d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically re-register docker container metadata for {@link org.apache.ambari.logfeeder.docker.DockerContainerRegistry}
+ * based on a time interval in seconds (property: logfeeder.container.registry.docker.interval, default: 5)
+ */
+public class DockerContainerRegistryMonitor implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistryMonitor.class);
+
+ private final DockerContainerRegistry registry;
+
+ public DockerContainerRegistryMonitor(DockerContainerRegistry registry) {
+ this.registry = registry;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ logger.debug("Gather docker containers metadata ...");
+ registry.register();
+ Thread.sleep(1000 * registry.getWaitIntervalMin());
+ } catch (Exception e) {
+ logger.error("Error during gather docker containers metadata.", e);
+ }
+ }
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java
new file mode 100644
index 0000000..65842b4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.apache.ambari.logfeeder.ContainerMetadata;
+
+public class DockerMetadata implements ContainerMetadata {
+
+ private final String id;
+ private final String name;
+ private final String logTypeLabel;
+ private final String logPath;
+ private final String hostName;
+ private final boolean running;
+ private final long timestamp;
+
+ public DockerMetadata(String id, String name, String hostName, String logTypeLabel, String logPath, boolean running, long timestamp) {
+ this.id = id;
+ this.name = name;
+ this.hostName = hostName;
+ this.logTypeLabel = logTypeLabel;
+ this.logPath = logPath;
+ this.running = running;
+ this.timestamp = timestamp;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public String getLogTypeLabel() {
+ return logTypeLabel;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "DockerMetadata{" +
+ "id='" + id + '\'' +
+ ", name='" + name + '\'' +
+ ", logTypeLabel='" + logTypeLabel + '\'' +
+ ", logPath='" + logPath + '\'' +
+ ", hostName='" + hostName + '\'' +
+ '}';
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java
new file mode 100644
index 0000000..aa65c60
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class CommandExecutionHelper {
+
+ public static CommandResponse executeCommand(List<String> commands, Map<String, String> envMap) throws Exception {
+ ProcessBuilder processBuilder = new ProcessBuilder(commands);
+ Map<String, String> env = processBuilder.environment();
+ if (envMap != null) {
+ env.putAll(envMap);
+ }
+ Process shell = processBuilder.start();
+
+ BufferedReader stdInput = new BufferedReader(new InputStreamReader(shell.getInputStream()));
+ BufferedReader stdError = new BufferedReader(new InputStreamReader(shell.getErrorStream()));
+ List<String> stdOutLines = new ArrayList<>();
+ StringBuilder errOut = new StringBuilder();
+ String s = null;
+ while ((s = stdInput.readLine()) != null) {
+ stdOutLines.add(s);
+ }
+ while ((s = stdError.readLine()) != null) {
+ errOut.append(s);
+ }
+ int exitCode = shell.waitFor();
+
+ return new CommandResponse(exitCode, stdOutLines, errOut.toString());
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
similarity index 55%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
index b58db6a..7ead791 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.docker.command;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.List;
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
- Integer getDetachIntervalMin();
+/**
+ * Represent a bash command response (stdout as string list, stderr in string and an exit code)
+ */
+public class CommandResponse {
+ private final int exitCode;
+ private final List<String> stdOut;
+ private final String stdErr;
+
+ CommandResponse(int exitCode, List<String> stdOut, String stdErr) {
+ this.exitCode = exitCode;
+ this.stdOut = stdOut;
+ this.stdErr = stdErr;
+ }
- Integer getDetachTimeMin();
+ public int getExitCode() {
+ return exitCode;
+ }
- Integer getPathUpdateIntervalMin();
+ public List<String> getStdOut() {
+ return stdOut;
+ }
- Integer getMaxAgeMin();
+ public String getStdErr() {
+ return stdErr;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
similarity index 63%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
index b58db6a..db3de01 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.docker.command;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.Map;
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
- Integer getDetachIntervalMin();
-
- Integer getDetachTimeMin();
-
- Integer getPathUpdateIntervalMin();
+/**
+ * Responsible of execute container commands. (like listing or inspecting containers)
+ * @param <RESPONSE_TYPE>
+ */
+public interface ContainerCommand<RESPONSE_TYPE> {
- Integer getMaxAgeMin();
+ /**
+ * Execute a container command
+ * @param params extra parameters for the command
+ * @return return type of the execution - can be anything
+ */
+ RESPONSE_TYPE execute(Map<String, String> params);
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java
new file mode 100644
index 0000000..d4fc182
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Run 'docker inspect' on container ids - and read response and convert it from json response to a map object
+ */
+public class DockerInspectContainerCommand implements ContainerCommand<List<Map<String, Object>>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DockerInspectContainerCommand.class);
+
+ @Override
+ public List<Map<String, Object>> execute(Map<String, String> params) {
+ List<String> containerIds = Arrays.asList(params.get("containerIds").split(","));
+ CommandResponse commandResponse = null;
+ List<Map<String, Object>> listResponse = new ArrayList<>();
+ List<String> commandList = new ArrayList<>();
+ commandList.add("/usr/local/bin/docker");
+ commandList.add("inspect");
+ commandList.addAll(containerIds);
+ try {
+ commandResponse = CommandExecutionHelper.executeCommand(commandList, null);
+ if (commandResponse.getExitCode() != 0) {
+ logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode());
+ } else {
+ String jsonResponse = StringUtils.join(commandResponse.getStdOut(), "");
+ ObjectMapper mapper = new ObjectMapper();
+ listResponse = mapper.readValue(jsonResponse, List.class);
+ }
+ } catch (Exception e) {
+ logger.error("Error during inspect containers request", e);
+ }
+ return listResponse;
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java
new file mode 100644
index 0000000..a0596ca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Run 'docker ps -a -q' (+ logfeeder type filter) and save the response in a string list (container ids)
+ */
+public class DockerListContainerCommand implements ContainerCommand<List<String>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DockerListContainerCommand.class);
+
+ @Override
+ public List<String> execute(Map<String, String> params) {
+ CommandResponse commandResponse = null;
+ List<String> commandList = new ArrayList<>();
+ commandList.add("/usr/local/bin/docker");
+ commandList.add("ps");
+ commandList.add("-a");
+ commandList.add("-q");
+ // TODO: add --filter="label=logfeeder.log.type"
+ try {
+ commandResponse = CommandExecutionHelper.executeCommand(commandList, null);
+ if (commandResponse.getExitCode() != 0) {
+ logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode());
+ }
+ } catch (Exception e) {
+ logger.error("Error during inspect containers request", e);
+ }
+ return commandResponse != null ? commandResponse.getStdOut() : null;
+ }
+}
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
similarity index 52%
copy from ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
index 16cbb0f..6380ac7 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
@@ -12,21 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-cluster.name=CL1
-logfeeder.checkpoint.folder=/root/checkpoints
-logfeeder.metrics.collector.hosts=
-logfeeder.config.dir=/root/config/logfeeder/shipper-conf/
-logfeeder.config.files=shipper-conf/global.config.json,\
- shipper-conf/output.config.json
-logfeeder.log.filter.enable=true
-logfeeder.solr.config.interval=5
-logfeeder.solr.core.config.name=history
-logfeeder.solr.zk_connect_string=localhost:9983
-logfeeder.cache.enabled=true
-logfeeder.cache.size=100
-logfeeder.cache.key.field=log_message
-logfeeder.cache.dedup.interval=1000
-logfeeder.cache.last.dedup.enabled=true
-logsearch.config.zk_connect_string=localhost:9983
-logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
\ No newline at end of file
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} %-5p [%t] - %m%n
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index a586510..ed0edcd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -51,6 +51,7 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
private Filter<PROP_TYPE> firstFilter;
private boolean isClosed;
private String type;
+ private String logType;
private boolean useEventMD5 = false;
private boolean genEventMD5 = true;
private Thread thread;
@@ -238,6 +239,14 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
this.type = type;
}
+ public String getLogType() {
+ return logType;
+ }
+
+ public void setLogType(String logType) {
+ this.logType = logType;
+ }
+
public boolean isUseEventMD5() {
return useEventMD5;
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 24bff8d..2544c4c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -54,6 +54,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 2a23cd7..80b7104 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -297,6 +297,7 @@ public class ConfigHandler implements InputConfigMonitor {
continue;
}
input.setType(source);
+ input.setLogType(inputDescriptor.getType());
input.loadConfig(inputDescriptor);
if (input.isEnabled()) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 80dc163..a988840 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -89,4 +89,7 @@ public class LogFeederConstants {
public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension";
public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
+ public static final String DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY = "logfeeder.docker.registry.enabled";
+ public static final boolean DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT = false;
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index cfb199c..ee8cdcb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,7 +19,10 @@
package org.apache.ambari.logfeeder.conf;
import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.ContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
import org.apache.ambari.logfeeder.input.InputManagerImpl;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
@@ -39,6 +42,7 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import javax.inject.Inject;
+import java.util.Properties;
@Configuration
@PropertySource(value = {
@@ -99,6 +103,7 @@ public class ApplicationConfig {
@Bean
+ @DependsOn("containerRegistry")
public InputManager inputManager() {
return new InputManagerImpl();
}
@@ -107,4 +112,13 @@ public class ApplicationConfig {
public OutputManager outputManager() {
return new OutputManagerImpl();
}
+
+ @Bean
+ public DockerContainerRegistry containerRegistry() {
+ if (logFeederProps.isDockerContainerRegistryEnabled()) {
+ return DockerContainerRegistry.getInstance(logFeederProps.getProperties());
+ } else {
+ return null;
+ }
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index a75b2d6..9a29f86 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -130,6 +130,16 @@ public class LogFeederProps implements LogFeederProperties {
@Value("${" + LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY + ":/usr/lib/ambari-logsearch-logfeeder/conf/checkpoints}")
public String checkpointFolder;
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY,
+ description = "",
+ examples = {"true"},
+ defaultValue = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY + ":false}")
+ public boolean dockerContainerRegistryEnabled;
+
@Inject
private LogEntryCacheConfig logEntryCacheConfig;
@@ -227,6 +237,14 @@ public class LogFeederProps implements LogFeederProperties {
this.solrImplicitRouting = solrImplicitRouting;
}
+ public boolean isDockerContainerRegistryEnabled() {
+ return dockerContainerRegistryEnabled;
+ }
+
+ public void setDockerContainerRegistryEnabled(boolean dockerContainerRegistryEnabled) {
+ this.dockerContainerRegistryEnabled = dockerContainerRegistryEnabled;
+ }
+
@PostConstruct
public void init() {
properties = new Properties();
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
similarity index 68%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
index b58db6a..ab13775 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.filter;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
- Integer getDetachIntervalMin();
+import java.util.Map;
- Integer getDetachTimeMin();
+public class DockerLogFilter {
- Integer getPathUpdateIntervalMin();
+ private DockerLogFilter() {
+ }
- Integer getMaxAgeMin();
+ public static String getLogFromDockerJson(String jsonInput) {
+ Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(jsonInput);
+ return jsonMap.get("log").toString();
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 2074f93..5ed61cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -23,11 +23,14 @@ import com.google.gson.reflect.TypeToken;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.exception.GrokException;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
@@ -73,6 +76,8 @@ public class FilterGrok extends Filter<LogFeederProps> {
private boolean skipOnError = false;
+ private boolean dockerEnabled = false;
+
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
super.init(logFeederProps);
@@ -83,6 +88,12 @@ public class FilterGrok extends Filter<LogFeederProps> {
sourceField = getFilterDescriptor().getSourceField();
removeSourceField = BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(), removeSourceField);
skipOnError = ((FilterGrokDescriptor) getFilterDescriptor()).isSkipOnError();
+ if (logFeederProps.isDockerContainerRegistryEnabled()) {
+ Input input = getInput();
+ if (input != null && input instanceof InputFile) {
+ dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false);
+ }
+ }
LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
getShortDescription());
@@ -178,6 +189,9 @@ public class FilterGrok extends Filter<LogFeederProps> {
@Override
public void apply(String inputStr, InputMarker inputMarker) throws Exception {
+ if (dockerEnabled) {
+ inputStr = DockerLogFilter.getLogFromDockerJson(inputStr);
+ }
if (grokMessage == null) {
return;
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 726a237..441ce3e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -20,6 +20,9 @@ package org.apache.ambari.logfeeder.input;
import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerMetadata;
+import org.apache.ambari.logfeeder.input.monitor.DockerLogFileUpdateMonitor;
import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
@@ -29,7 +32,6 @@ import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
import org.apache.commons.lang.BooleanUtils;
@@ -81,28 +83,43 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
private Thread thread;
private Thread logFileDetacherThread;
private Thread logFilePathUpdaterThread;
+ private Thread dockerLogFileUpdateMonitorThread;
private ThreadGroup threadGroup;
private boolean multiFolder = false;
+ private boolean dockerLog = false;
+ private boolean dockerLogParent = true;
+ private DockerContainerRegistry dockerContainerRegistry;
private Map<String, List<File>> folderMap;
private Map<String, InputFile> inputChildMap = new HashMap<>();
@Override
public boolean isReady() {
if (!isReady) {
- // Let's try to check whether the file is available
- logFiles = getActualInputLogFiles();
- Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
- setFolderMap(foldersMap);
- if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
- if (tail && logFiles.length > 1) {
- LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
- ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+ if (dockerLog) {
+ if (dockerContainerRegistry != null) {
+ Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
+ String logType = getLogType();
+ if (metadataMap.containsKey(logType)) {
+ isReady = true;
+ }
+ } else {
+ LOG.warn("Docker registry is not set, probably docker registry usage is not enabled.");
}
- LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
- isReady = true;
} else {
- LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+ logFiles = getActualInputLogFiles();
+ Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+ setFolderMap(foldersMap);
+ if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
+ if (tail && logFiles.length > 1) {
+ LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+ ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+ }
+ LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
+ isReady = true;
+ } else {
+ LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+ }
}
}
return isReady;
@@ -150,7 +167,25 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
@Override
public boolean monitor() {
if (isReady()) {
- if (multiFolder) {
+ if (dockerLog && dockerLogParent) {
+ Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
+ String logType = getLogType();
+ threadGroup = new ThreadGroup("docker-parent-" + logType);
+ if (metadataMap.containsKey(logType)) {
+ Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(logType);
+ for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : dockerMetadataMap.entrySet()) {
+ try {
+ startNewChildDockerInputFileThread(dockerMetadataEntry.getValue());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ dockerLogFileUpdateMonitorThread = new Thread(new DockerLogFileUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "docker_logfiles_updater=" + logType);
+ dockerLogFileUpdateMonitorThread.setDaemon(true);
+ dockerLogFileUpdateMonitorThread.start();
+ }
+ }
+ else if (multiFolder) {
try {
threadGroup = new ThreadGroup(getNameForThread());
if (getFolderMap() != null) {
@@ -181,6 +216,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
@Override
public InputFileMarker getInputMarker() {
+ // TODO: use this
return null;
}
@@ -190,10 +226,6 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
LOG.info("init() called");
checkPointExtension = logFeederProps.getCheckPointExtension();
-
- // Let's close the file and set it to true after we start monitoring it
- setClosed(true);
- logPath = getInputDescriptor().getPath();
checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
detachIntervalMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(), DEFAULT_DETACH_INTERVAL_MIN * 60);
detachTimeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(), DEFAULT_DETACH_TIME_MIN * 60);
@@ -201,23 +233,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
maxAgeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(), 0);
boolean initDefaultFields = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(), false);
setInitDefaultFields(initDefaultFields);
- if (StringUtils.isEmpty(logPath)) {
- LOG.error("path is empty for file input. " + getShortDescription());
- return;
- }
- setFilePath(logPath);
- // Check there can have pattern in folder
- if (getFilePath() != null && getFilePath().contains("/")) {
- int lastIndexOfSlash = getFilePath().lastIndexOf("/");
- String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash);
- if (folderBeforeLogName.contains("*")) {
- LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders.");
- setMultiFolder(true);
+ // Let's close the file and set it to true after we start monitoring it
+ setClosed(true);
+ dockerLog = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDockerEnabled(), false);
+ if (dockerLog) {
+ if (logFeederProps.isDockerContainerRegistryEnabled()) {
+ boolean isFileReady = isReady();
+ LOG.info("Container type to monitor " + getType() + ", tail=" + tail + ", isReady=" + isFileReady);
+ } else {
+ LOG.warn("Using docker input, but docker registry usage is not enabled.");
+ }
+ } else {
+ logPath = getInputDescriptor().getPath();
+ if (StringUtils.isEmpty(logPath)) {
+ LOG.error("path is empty for file input. " + getShortDescription());
+ return;
}
+
+ setFilePath(logPath);
+ // Check there can have pattern in folder
+ if (getFilePath() != null && getFilePath().contains("/")) {
+ int lastIndexOfSlash = getFilePath().lastIndexOf("/");
+ String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash);
+ if (folderBeforeLogName.contains("*")) {
+ LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders.");
+ setMultiFolder(true);
+ }
+ }
+ boolean isFileReady = isReady();
+ LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
}
- boolean isFileReady = isReady();
- LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
initCache(
@@ -295,6 +341,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
}
}
+ public void startNewChildDockerInputFileThread(DockerMetadata dockerMetadata) throws CloneNotSupportedException {
+ LOG.info("Start docker child input thread - " + dockerMetadata.getLogPath());
+ InputFile clonedObject = (InputFile) this.clone();
+ clonedObject.setDockerLogParent(false);
+ clonedObject.logPath = dockerMetadata.getLogPath();
+ clonedObject.setFilePath(logPath);
+ clonedObject.logFiles = new File[]{new File(dockerMetadata.getLogPath())};
+ clonedObject.setInputChildMap(new HashMap<>());
+ clonedObject.setDockerLogFileUpdateMonitorThread(null);
+ copyFilters(clonedObject, getFirstFilter());
+ Thread thread = new Thread(threadGroup, clonedObject, "file=" + dockerMetadata.getLogPath());
+ clonedObject.setThread(thread);
+ inputChildMap.put(dockerMetadata.getLogPath(), clonedObject);
+ thread.start();
+ }
+
+ public void stopChildDockerInputFileThread(String logPathKey) {
+ LOG.info("Stop child input thread - " + logPathKey);
+ String filePath = new File(logPathKey).getName();
+ if (inputChildMap.containsKey(logPathKey)) {
+ InputFile inputFile = inputChildMap.get(logPathKey);
+ inputFile.setClosed(true);
+ if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+ inputFile.getThread().interrupt();
+ }
+ inputChildMap.remove(logPathKey);
+ } else {
+ LOG.warn(logPathKey + " not found as an input child.");
+ }
+ }
+
public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException {
LOG.info("Start child input thread - " + folderFileEntry.getKey());
InputFile clonedObject = (InputFile) this.clone();
@@ -506,8 +583,35 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
this.logFilePathUpdaterThread = logFilePathUpdaterThread;
}
+ public Thread getDockerLogFileUpdateMonitorThread() {
+ return dockerLogFileUpdateMonitorThread;
+ }
+
+ public void setDockerLogFileUpdateMonitorThread(Thread dockerLogFileUpdateMonitorThread) {
+ this.dockerLogFileUpdateMonitorThread = dockerLogFileUpdateMonitorThread;
+ }
+
public Integer getMaxAgeMin() {
return maxAgeMin;
}
+ public void setDockerContainerRegistry(DockerContainerRegistry dockerContainerRegistry) {
+ this.dockerContainerRegistry = dockerContainerRegistry;
+ }
+
+ public DockerContainerRegistry getDockerContainerRegistry() {
+ return dockerContainerRegistry;
+ }
+
+ public boolean isDockerLog() {
+ return dockerLog;
+ }
+
+ public boolean isDockerLogParent() {
+ return dockerLogParent;
+ }
+
+ public void setDockerLogParent(boolean dockerLogParent) {
+ this.dockerLogParent = dockerLogParent;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 40475c6..ea97968 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -20,6 +20,8 @@ package org.apache.ambari.logfeeder.input;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.input.Input;
@@ -65,6 +67,9 @@ public class InputManagerImpl extends InputManager {
private Thread inputIsReadyMonitor;
@Inject
+ private DockerContainerRegistry dockerContainerRegistry;
+
+ @Inject
private LogFeederProps logFeederProps;
public List<Input> getInputList(String serviceName) {
@@ -127,6 +132,7 @@ public class InputManagerImpl extends InputManager {
public void init() throws Exception {
initCheckPointSettings();
startMonitorThread();
+ startDockerMetadataThread();
}
private void initCheckPointSettings() {
@@ -162,6 +168,13 @@ public class InputManagerImpl extends InputManager {
}
}
+ private void startDockerMetadataThread() {
+ if (logFeederProps.isDockerContainerRegistryEnabled()) {
+ Thread obtaiinDockerMetadataThread = new Thread(new DockerContainerRegistryMonitor(dockerContainerRegistry), "obtain_docker_metadata");
+ obtaiinDockerMetadataThread.start();
+ }
+ }
+
private void startMonitorThread() {
inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
@Override
@@ -199,6 +212,10 @@ public class InputManagerImpl extends InputManager {
public void startInputs(String serviceName) {
for (Input input : inputs.get(serviceName)) {
try {
+ if (input instanceof InputFile) {// apply docker metadata registry
+ InputFile inputFile = (InputFile) input;
+ inputFile.setDockerContainerRegistry(dockerContainerRegistry);
+ }
input.init(logFeederProps);
if (input.isReady()) {
input.monitor();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java
new file mode 100644
index 0000000..0275827
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerMetadata;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Periodically check docker containers metadata registry, stop monitoring container log files if those do not exist or stopped too long time ago.
+ * If it finds a new container log for the specific type, it will start to monitoring it.
+ * <br/>
+ * Use cases:<br/>
+ * - input has not monitored yet - found new container -> start monitoring it <br/>
+ * - input has not monitored yet - found new stopped container -> start monitoring it <br/>
+ * - input has not monitored yet - found new stopped container but log is too old -> do not monitoring it <br/>
+ * - input has monitored already - container stopped - if it's stopped for too long time -> remove it from the monitoed list<br/>
+ * - input has monitored already - container stopped - log is not too old -> keep in the monitored list <br/>
+ * - input has monitored already - container does not exist - remove it from the monitoed list (and all other input with the same log type) <br/>
+ */
+public class DockerLogFileUpdateMonitor extends AbstractLogFileMonitor {
+
+ private Logger LOG = LoggerFactory.getLogger(DockerLogFileUpdateMonitor.class);
+
+ public DockerLogFileUpdateMonitor(InputFile inputFile, int waitInterval, int detachTime) {
+ super(inputFile, waitInterval, detachTime);
+ }
+
+ @Override
+ protected String getStartLog() {
+ return "Start docker component type log files monitor thread for " + getInputFile().getLogType();
+ }
+
+ @Override
+ protected void monitorAndUpdate() throws Exception {
+ DockerContainerRegistry dockerContainerRegistry = getInputFile().getDockerContainerRegistry();
+ Map<String, Map<String, DockerMetadata>> dockerMetadataMapByType = dockerContainerRegistry.getContainerMetadataMap();
+ String logType = getInputFile().getLogType();
+ Map<String, InputFile> copiedChildMap = new HashMap<>(getInputFile().getInputChildMap());
+
+ if (dockerMetadataMapByType.containsKey(logType)) {
+ Map<String, DockerMetadata> dockerMetadataMap = dockerMetadataMapByType.get(logType);
+ for (Map.Entry<String, DockerMetadata> containerEntry : dockerMetadataMap.entrySet()) {
+ String logPath = containerEntry.getValue().getLogPath();
+ String containerId = containerEntry.getValue().getId();
+ long timestamp = containerEntry.getValue().getTimestamp();
+ boolean running = containerEntry.getValue().isRunning();
+ LOG.debug("Found log path: {} (container id: {})", logPath, containerId);
+ if (!copiedChildMap.containsKey(logPath)) {
+ if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) {
+ LOG.debug("Container with id {} is stopped, won't monitor as it stopped for long time.", containerId);
+ } else {
+ LOG.info("Found new container (id: {}) with new log path: {}", logPath, containerId);
+ getInputFile().startNewChildDockerInputFileThread(containerEntry.getValue());
+ }
+ } else {
+ if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) {
+ LOG.info("Removing: {}", logPath);
+ getInputFile().stopChildDockerInputFileThread(containerEntry.getKey());
+ }
+ }
+ }
+ } else {
+ if (!copiedChildMap.isEmpty()) {
+ LOG.info("Removing all inputs with type: {}", logType);
+ for (Map.Entry<String, InputFile> inputFileEntry : copiedChildMap.entrySet()) {
+ LOG.info("Removing: {}", inputFileEntry.getKey());
+ getInputFile().stopChildDockerInputFileThread(inputFileEntry.getKey());
+ }
+ }
+ }
+ }
+
+ private boolean isItTooOld(long timestamp, long actualTimestamp, long maxDiffMinutes) {
+ long diff = actualTimestamp - timestamp;
+ long maxDiffMins = maxDiffMinutes * 1000 * 60;
+ return diff > maxDiffMins;
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
index efa56a2..012455e 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
@@ -40,6 +40,9 @@ public class LSServerInputFile extends LSServerInputFileBase {
@JsonProperty("max_age_min")
private Integer maxAgeMin;
+ @JsonProperty("docker")
+ private Boolean dockerEnabled;
+
public LSServerInputFile() {}
public LSServerInputFile(InputDescriptor inputDescriptor) {
@@ -49,6 +52,7 @@ public class LSServerInputFile extends LSServerInputFileBase {
this.detachTimeMin = inputFileDescriptor.getDetachTimeMin();
this.pathUpdateIntervalMin = inputFileDescriptor.getPathUpdateIntervalMin();
this.maxAgeMin = inputFileDescriptor.getMaxAgeMin();
+ this.dockerEnabled = inputFileDescriptor.getDockerEnabled();
}
public Integer getDetachIntervalMin() {
@@ -82,4 +86,12 @@ public class LSServerInputFile extends LSServerInputFileBase {
public void setMaxAgeMin(Integer maxAgeMin) {
this.maxAgeMin = maxAgeMin;
}
+
+ public Boolean getDockerEnabled() {
+ return dockerEnabled;
+ }
+
+ public void setDockerEnabled(Boolean dockerEnabled) {
+ this.dockerEnabled = dockerEnabled;
+ }
}
diff --git a/ambari-logsearch/docker/docker-compose.yml b/ambari-logsearch/docker/docker-compose.yml
index 99e0f18..b73ee5c 100644
--- a/ambari-logsearch/docker/docker-compose.yml
+++ b/ambari-logsearch/docker/docker-compose.yml
@@ -47,6 +47,8 @@ services:
image: ambari-logsearch:v1.0
restart: always
hostname: logsearch.apache.org
+ labels:
+ logfeeder.log.type: "logsearch_server"
networks:
- logsearch-network
env_file:
@@ -68,6 +70,9 @@ services:
image: ambari-logsearch:v1.0
restart: always
hostname: logfeeder.apache.org
+ privileged: true
+ labels:
+ logfeeder.log.type: "logfeeder"
networks:
- logsearch-network
env_file:
@@ -82,6 +87,9 @@ services:
- $AMBARI_LOCATION:/root/ambari
- $AMBARI_LOCATION/ambari-logsearch/docker/test-logs:/root/test-logs
- $AMBARI_LOCATION/ambari-logsearch/docker/test-config:/root/test-config
+ - /var/run/docker.sock:/var/run/docker.sock
+ - /usr/local/bin/docker:/usr/local/bin/docker
+ - /var/lib/docker:/var/lib/docker
networks:
logsearch-network:
driver: bridge
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 16cbb0f..850aca2 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -29,4 +29,5 @@ logfeeder.cache.key.field=log_message
logfeeder.cache.dedup.interval=1000
logfeeder.cache.last.dedup.enabled=true
logsearch.config.zk_connect_string=localhost:9983
-logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
\ No newline at end of file
+logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
+logfeeder.docker.registry.enabled=true
\ No newline at end of file
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json
new file mode 100644
index 0000000..a420960
--- /dev/null
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json
@@ -0,0 +1,31 @@
+{
+ "input": [
+ {
+ "type": "logsearch_server",
+ "rowtype": "service",
+ "docker": "true"
+ }
+ ],
+ "filter": [
+ {
+ "filter": "grok",
+ "conditions": {
+ "fields": {
+ "type": [
+ "logsearch_server"
+ ]
+ }
+ },
+ "log4j_format": "",
+ "multiline_pattern": "^(%{DATESTAMP:logtime})",
+ "message_pattern": "(?m)^%{DATESTAMP:logtime}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{JAVACLASS}%{SPACE}\\(%{JAVAFILE:file}:%{INT:line_number}\\)%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
+ "post_map_values": {
+ "logtime": {
+ "map_date": {
+ "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+ }
+ }
+ }
+ }
+ ]
+}
diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml
index 5fbbb33..6d5f9f1 100644
--- a/ambari-logsearch/pom.xml
+++ b/ambari-logsearch/pom.xml
@@ -33,6 +33,7 @@
<module>ambari-logsearch-config-zookeeper</module>
<module>ambari-logsearch-it</module>
<module>ambari-logsearch-logfeeder-plugin-api</module>
+ <module>ambari-logsearch-logfeeder-container-registry</module>
</modules>
<properties>
<jdk.version>1.8</jdk.version>