You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/07/19 13:40:44 UTC

[ambari] branch branch-2.7 updated: AMBARI-24183. Log Feeder: read and ship docker container logs.(#1799)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 83c8063  AMBARI-24183. Log Feeder: read and ship docker container logs.(#1799)
83c8063 is described below

commit 83c8063e593f0a67ac247c6d3e03876c4a8adb18
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Thu Jul 19 15:40:42 2018 +0200

    AMBARI-24183. Log Feeder: read and ship docker container logs.(#1799)
---
 .../api/model/inputconfig/InputFileDescriptor.java |   2 +
 .../inputconfig/impl/InputFileDescriptorImpl.java  |  20 +++
 .../pom.xml                                        |  83 +++++++++++
 .../ambari/logfeeder/ContainerMetadata.java}       |  42 +++++-
 .../ambari/logfeeder/ContainerRegistry.java}       |  26 +++-
 .../logfeeder/docker/DockerContainerRegistry.java  | 145 ++++++++++++++++++
 .../docker/DockerContainerRegistryMonitor.java     |  50 +++++++
 .../ambari/logfeeder/docker/DockerMetadata.java    |  81 ++++++++++
 .../docker/command/CommandExecutionHelper.java     |  52 +++++++
 .../logfeeder/docker/command/CommandResponse.java} |  34 ++++-
 .../docker/command/ContainerCommand.java}          |  25 ++--
 .../command/DockerInspectContainerCommand.java     |  61 ++++++++
 .../docker/command/DockerListContainerCommand.java |  54 +++++++
 .../src/main/resources/log4j.properties}           |  23 +--
 .../ambari/logfeeder/plugin/input/Input.java       |   9 ++
 .../ambari-logsearch-logfeeder/pom.xml             |   5 +
 .../ambari/logfeeder/common/ConfigHandler.java     |   1 +
 .../logfeeder/common/LogFeederConstants.java       |   3 +
 .../ambari/logfeeder/conf/ApplicationConfig.java   |  14 ++
 .../ambari/logfeeder/conf/LogFeederProps.java      |  18 +++
 .../ambari/logfeeder/filter/DockerLogFilter.java}  |  20 ++-
 .../apache/ambari/logfeeder/filter/FilterGrok.java |  14 ++
 .../apache/ambari/logfeeder/input/InputFile.java   | 166 +++++++++++++++++----
 .../ambari/logfeeder/input/InputManagerImpl.java   |  17 +++
 .../input/monitor/DockerLogFileUpdateMonitor.java  | 101 +++++++++++++
 .../logsearch/model/common/LSServerInputFile.java  |  12 ++
 ambari-logsearch/docker/docker-compose.yml         |   8 +
 .../test-config/logfeeder/logfeeder.properties     |   3 +-
 .../input.config-logsearch-docker.json             |  31 ++++
 ambari-logsearch/pom.xml                           |   1 +
 30 files changed, 1029 insertions(+), 92 deletions(-)

diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
index b58db6a..2689f82 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
@@ -27,4 +27,6 @@ public interface InputFileDescriptor extends InputFileBaseDescriptor {
   Integer getPathUpdateIntervalMin();
 
   Integer getMaxAgeMin();
+
+  Boolean getDockerEnabled();
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
index 99b42fe..2ba53e6 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -70,6 +70,17 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
   @SerializedName("max_age_min")
   private Integer maxAgeMin;
 
+  @ShipperConfigElementDescription(
+    path = "/input/[]/docker",
+    type = "boolean",
+    description = "Input comes from a docker container.",
+    examples = {"true", "false"},
+    defaultValue = "false"
+  )
+  @Expose
+  @SerializedName("docker")
+  private Boolean dockerEnabled;
+
   @Override
   public Integer getDetachIntervalMin() {
     return this.detachIntervalMin;
@@ -90,6 +101,11 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
     return this.maxAgeMin;
   }
 
+  @Override
+  public Boolean getDockerEnabled() {
+    return dockerEnabled;
+  }
+
   public void setDetachIntervalMin(Integer detachIntervalMin) {
     this.detachIntervalMin = detachIntervalMin;
   }
@@ -105,4 +121,8 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme
   public void setMaxAgeMin(Integer maxAgeMin) {
     this.maxAgeMin = maxAgeMin;
   }
+
+  public void setDockerEnabled(Boolean dockerEnabled) {
+    this.dockerEnabled = dockerEnabled;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml
new file mode 100644
index 0000000..66333f5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-logsearch</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.0.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <packaging>jar</packaging>
+  <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.25</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.7.25</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.9.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.9.4</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <source>${jdk.version}</source>
+          <target>${jdk.version}</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/log4j.properties</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
similarity index 54%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
index b58db6a..df3a80a 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+/**
+ * Holds container related metadata
+ **/
+public interface ContainerMetadata {
 
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
-  Integer getDetachIntervalMin();
+  /**
+   * Id of the container, used for getting the right log path
+   * @return container id
+   */
+  String getId();
 
-  Integer getDetachTimeMin();
+  /**
+   * Name of the container
+   * @return container name
+   */
+  String getName();
 
-  Integer getPathUpdateIntervalMin();
+  /**
+   * Hostname of the container, can be container host itself or the actual hostname
+   * @return container host name
+   */
+  String getHostName();
+
+  /**
+   * Log label
+   * @return log type label
+   */
+  String getLogTypeLabel();
+
+  /**
+   * Log path of the container (should be json file)
+   * @return log path
+   */
+  String getLogPath();
 
-  Integer getMaxAgeMin();
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
similarity index 58%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
index b58db6a..94f6a82 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.Map;
 
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
-  Integer getDetachIntervalMin();
+/**
+ * Responsible of register or drop new / existing containers.
+ * @param <METADATA_TYPE> type of metadata - could be docker or other container implementation
+ */
+public interface ContainerRegistry<METADATA_TYPE extends ContainerMetadata> {
 
-  Integer getDetachTimeMin();
+  /**
+   * Register process of running containers
+   */
+  void register();
 
-  Integer getPathUpdateIntervalMin();
+  /**
+   * Holds container metadata per log component type and container id.
+   * @return container metadata
+   */
+  Map<String, Map<String, METADATA_TYPE>> getContainerMetadataMap();
 
-  Integer getMaxAgeMin();
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java
new file mode 100644
index 0000000..c3e816e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.apache.ambari.logfeeder.ContainerRegistry;
+import org.apache.ambari.logfeeder.docker.command.DockerInspectContainerCommand;
+import org.apache.ambari.logfeeder.docker.command.DockerListContainerCommand;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Register docker metadata from docker containers on the host (with listing containers and inspecting them)
+ */
+public final class DockerContainerRegistry implements ContainerRegistry<DockerMetadata> {
+
+  private static final String LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL = "logfeeder.container.registry.docker.interval";
+  private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistry.class);
+
+  private static DockerContainerRegistry INSTANCE = null;
+  private final Properties configs;
+  private Map<String, Map<String, DockerMetadata>> dockerMetadataMap = new ConcurrentHashMap<>();
+  private int waitIntervalMin = 5;
+
+  private DockerContainerRegistry(Properties configs) {
+    this.configs = configs;
+    init(configs);
+  }
+
+  @Override
+  public synchronized void register() {
+    Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = renewMetadata();
+    if (!actualDockerMetadataMap.isEmpty()) {
+      dockerMetadataMap.putAll(actualDockerMetadataMap);
+      dockerMetadataMap = dockerMetadataMap
+        .entrySet()
+        .stream()
+        .filter(e -> actualDockerMetadataMap.keySet().contains(e.getKey()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+      for (Map.Entry<String, Map<String, DockerMetadata>> entry : dockerMetadataMap.entrySet()) {
+        for (Map.Entry<String, DockerMetadata> metadataEntry : entry.getValue().entrySet()) {
+          logger.debug("Found container metadata: {}", entry.getValue().toString());
+        }
+      }
+    }
+  }
+
+  private Map<String, Map<String, DockerMetadata>> renewMetadata() {
+    final Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = new HashMap<>();
+    final List<String> containerIds = new DockerListContainerCommand().execute(null);
+    final Map<String, String> params = new HashMap<>();
+
+    params.put("containerIds", StringUtils.join(containerIds, ","));
+    List<Map<String, Object>> containerDataList = new DockerInspectContainerCommand().execute(params);
+
+    for (Map<String, Object> containerDataMap : containerDataList) {
+      String id = containerDataMap.get("Id").toString();
+      String name = containerDataMap.get("Name").toString();
+      String logPath = containerDataMap.get("LogPath").toString();
+      Map<String, Object> dockerConfigMap = (HashMap<String, Object>) containerDataMap.get("Config");
+      String hostname = dockerConfigMap.get("Hostname").toString();
+      Map<String, String> labels = (Map<String, String>) dockerConfigMap.get("Labels");
+      Map<String, Object> stateMap = (HashMap<String, Object>) containerDataMap.get("State");
+      String componentType = labels.get("logfeeder.log.type");
+      boolean running = (Boolean) stateMap.get("Running");
+      long timestamp = running ? convertDateStrToLong((String)stateMap.get("StartedAt")) : convertDateStrToLong((String)stateMap.get("FinishedAt"));
+
+      if (componentType != null) {
+        if (actualDockerMetadataMap.containsKey(componentType)) {
+          Map<String, DockerMetadata> componentMetadataMap = actualDockerMetadataMap.get(componentType);
+          componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp));
+          actualDockerMetadataMap.put(componentType, componentMetadataMap);
+        } else {
+          Map<String, DockerMetadata> componentMetadataMap = new HashMap<>();
+          componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp));
+          actualDockerMetadataMap.put(componentType, componentMetadataMap);
+        }
+      } else {
+        logger.debug("Ignoring docker metadata from registry as container (id: {}, name: {}) as it has no 'logfeeder.log.type' label", id, name);
+      }
+    }
+
+    return actualDockerMetadataMap;
+  }
+
+  @Override
+  public synchronized Map<String, Map<String, DockerMetadata>> getContainerMetadataMap() {
+    return dockerMetadataMap;
+  }
+
+  public void init(Properties configs) {
+    // init docker related data
+    String waitStr = configs.getProperty(LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL, "5");
+    setWaitIntervalMin(Integer.parseInt(waitStr));
+    // TODO: add docker authentication settings through this
+  }
+
+  public static synchronized DockerContainerRegistry getInstance(Properties dockerConfig) {
+    if (INSTANCE == null) {
+      return new DockerContainerRegistry(dockerConfig);
+    } else {
+      return INSTANCE;
+    }
+  }
+
+  public int getWaitIntervalMin() {
+    return waitIntervalMin;
+  }
+
+  public void setWaitIntervalMin(int waitIntervalMin) {
+    this.waitIntervalMin = waitIntervalMin;
+  }
+
+  private long convertDateStrToLong(String timestampStr) {
+    LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, DateTimeFormatter.ISO_DATE_TIME);
+    return localDateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli();
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java
new file mode 100644
index 0000000..30c328d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically re-register docker container metadata for {@link org.apache.ambari.logfeeder.docker.DockerContainerRegistry}
+ * based on a time interval in seconds (property: logfeeder.container.registry.docker.interval, default: 5)
+ */
+public class DockerContainerRegistryMonitor implements Runnable {
+
+  private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistryMonitor.class);
+
+  private final DockerContainerRegistry registry;
+
+  public DockerContainerRegistryMonitor(DockerContainerRegistry registry) {
+    this.registry = registry;
+  }
+
+  @Override
+  public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        logger.debug("Gather docker containers metadata ...");
+        registry.register();
+        Thread.sleep(1000 * registry.getWaitIntervalMin());
+      } catch (Exception e) {
+        logger.error("Error during gather docker containers metadata.", e);
+      }
+    }
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java
new file mode 100644
index 0000000..65842b4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker;
+
+import org.apache.ambari.logfeeder.ContainerMetadata;
+
+public class DockerMetadata implements ContainerMetadata {
+
+  private final String id;
+  private final String name;
+  private final String logTypeLabel;
+  private final String logPath;
+  private final String hostName;
+  private final boolean running;
+  private final long timestamp;
+
+  public DockerMetadata(String id, String name, String hostName, String logTypeLabel, String logPath, boolean running, long timestamp) {
+    this.id = id;
+    this.name = name;
+    this.hostName = hostName;
+    this.logTypeLabel = logTypeLabel;
+    this.logPath = logPath;
+    this.running = running;
+    this.timestamp = timestamp;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public String getLogTypeLabel() {
+    return logTypeLabel;
+  }
+
+  public String getLogPath() {
+    return logPath;
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return "DockerMetadata{" +
+      "id='" + id + '\'' +
+      ", name='" + name + '\'' +
+      ", logTypeLabel='" + logTypeLabel + '\'' +
+      ", logPath='" + logPath + '\'' +
+      ", hostName='" + hostName + '\'' +
+      '}';
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java
new file mode 100644
index 0000000..aa65c60
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class CommandExecutionHelper {
+
+  public static CommandResponse executeCommand(List<String> commands, Map<String, String> envMap) throws Exception {
+    ProcessBuilder processBuilder = new ProcessBuilder(commands);
+    Map<String, String> env = processBuilder.environment();
+    if (envMap != null) {
+      env.putAll(envMap);
+    }
+    Process shell = processBuilder.start();
+
+    BufferedReader stdInput = new BufferedReader(new InputStreamReader(shell.getInputStream()));
+    BufferedReader stdError = new BufferedReader(new InputStreamReader(shell.getErrorStream()));
+    List<String> stdOutLines = new ArrayList<>();
+    StringBuilder errOut = new StringBuilder();
+    String s = null;
+    while ((s = stdInput.readLine()) != null) {
+      stdOutLines.add(s);
+    }
+    while ((s = stdError.readLine()) != null) {
+      errOut.append(s);
+    }
+    int exitCode = shell.waitFor();
+
+    return new CommandResponse(exitCode, stdOutLines, errOut.toString());
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
similarity index 55%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
index b58db6a..7ead791 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.docker.command;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.List;
 
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
-  Integer getDetachIntervalMin();
+/**
+ * Represent a bash command response (stdout as string list, stderr in string and an exit code)
+ */
+public class CommandResponse {
+  private final int exitCode;
+  private final List<String> stdOut;
+  private final String stdErr;
+
+  CommandResponse(int exitCode, List<String> stdOut, String stdErr) {
+    this.exitCode = exitCode;
+    this.stdOut = stdOut;
+    this.stdErr = stdErr;
+  }
 
-  Integer getDetachTimeMin();
+  public int getExitCode() {
+    return exitCode;
+  }
 
-  Integer getPathUpdateIntervalMin();
+  public List<String> getStdOut() {
+    return stdOut;
+  }
 
-  Integer getMaxAgeMin();
+  public String getStdErr() {
+    return stdErr;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
similarity index 63%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
index b58db6a..db3de01 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.docker.command;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import java.util.Map;
 
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
-  Integer getDetachIntervalMin();
-
-  Integer getDetachTimeMin();
-
-  Integer getPathUpdateIntervalMin();
+/**
+ * Responsible of execute container commands. (like listing or inspecting containers)
+ * @param <RESPONSE_TYPE>
+ */
+public interface ContainerCommand<RESPONSE_TYPE> {
 
-  Integer getMaxAgeMin();
+  /**
+   * Execute a container command
+   * @param params extra parameters for the command
+   * @return return type of the execution - can be anything
+   */
+  RESPONSE_TYPE execute(Map<String, String> params);
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java
new file mode 100644
index 0000000..d4fc182
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Run 'docker inspect' on container ids - and read response and convert it from json response to a map object
+ */
+public class DockerInspectContainerCommand implements ContainerCommand<List<Map<String, Object>>> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DockerInspectContainerCommand.class);
+
+  @Override
+  public List<Map<String, Object>> execute(Map<String, String> params) {
+    List<String> containerIds = Arrays.asList(params.get("containerIds").split(","));
+    CommandResponse commandResponse = null;
+    List<Map<String, Object>> listResponse = new ArrayList<>();
+    List<String> commandList = new ArrayList<>();
+    commandList.add("/usr/local/bin/docker");
+    commandList.add("inspect");
+    commandList.addAll(containerIds);
+    try {
+      commandResponse = CommandExecutionHelper.executeCommand(commandList, null);
+      if (commandResponse.getExitCode() != 0) {
+        logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode());
+      } else {
+        String jsonResponse = StringUtils.join(commandResponse.getStdOut(), "");
+        ObjectMapper mapper = new ObjectMapper();
+        listResponse = mapper.readValue(jsonResponse, List.class);
+      }
+    } catch (Exception e) {
+      logger.error("Error during inspect containers request", e);
+    }
+    return listResponse;
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java
new file mode 100644
index 0000000..a0596ca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.docker.command;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Run 'docker ps -a -q' (+ logfeeder type filter) and save the response in a string list (container ids)
+ */
+public class DockerListContainerCommand implements ContainerCommand<List<String>> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DockerListContainerCommand.class);
+
+  @Override
+  public List<String> execute(Map<String, String> params) {
+    CommandResponse commandResponse = null;
+    List<String> commandList = new ArrayList<>();
+    commandList.add("/usr/local/bin/docker");
+    commandList.add("ps");
+    commandList.add("-a");
+    commandList.add("-q");
+    // TODO: add --filter="label=logfeeder.log.type"
+    try {
+      commandResponse = CommandExecutionHelper.executeCommand(commandList, null);
+      if (commandResponse.getExitCode() != 0) {
+        logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode());
+      }
+    } catch (Exception e) {
+      logger.error("Error during inspect containers request", e);
+    }
+    return commandResponse != null ? commandResponse.getStdOut() : null;
+  }
+}
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
similarity index 52%
copy from ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
index 16cbb0f..6380ac7 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties
@@ -12,21 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-cluster.name=CL1
-logfeeder.checkpoint.folder=/root/checkpoints
-logfeeder.metrics.collector.hosts=
-logfeeder.config.dir=/root/config/logfeeder/shipper-conf/
-logfeeder.config.files=shipper-conf/global.config.json,\
-  shipper-conf/output.config.json
-logfeeder.log.filter.enable=true
-logfeeder.solr.config.interval=5
-logfeeder.solr.core.config.name=history
-logfeeder.solr.zk_connect_string=localhost:9983
-logfeeder.cache.enabled=true
-logfeeder.cache.size=100
-logfeeder.cache.key.field=log_message
-logfeeder.cache.dedup.interval=1000
-logfeeder.cache.last.dedup.enabled=true
-logsearch.config.zk_connect_string=localhost:9983
-logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
\ No newline at end of file
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} %-5p [%t] - %m%n
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index a586510..ed0edcd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -51,6 +51,7 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
   private Filter<PROP_TYPE> firstFilter;
   private boolean isClosed;
   private String type;
+  private String logType;
   private boolean useEventMD5 = false;
   private boolean genEventMD5 = true;
   private Thread thread;
@@ -238,6 +239,14 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
     this.type = type;
   }
 
+  public String getLogType() {
+    return logType;
+  }
+
+  public void setLogType(String logType) {
+    this.logType = logType;
+  }
+
   public boolean isUseEventMD5() {
     return useEventMD5;
   }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 24bff8d..2544c4c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -54,6 +54,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
       <version>1.3.1</version>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 2a23cd7..80b7104 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -297,6 +297,7 @@ public class ConfigHandler implements InputConfigMonitor {
         continue;
       }
       input.setType(source);
+      input.setLogType(inputDescriptor.getType());
       input.loadConfig(inputDescriptor);
 
       if (input.isEnabled()) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 80dc163..a988840 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -89,4 +89,7 @@ public class LogFeederConstants {
   public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension";
   public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
 
+  public static final String DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY = "logfeeder.docker.registry.enabled";
+  public static final boolean DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT = false;
+
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index cfb199c..ee8cdcb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,7 +19,10 @@
 package org.apache.ambari.logfeeder.conf;
 
 import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.ContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
@@ -39,6 +42,7 @@ import org.springframework.context.annotation.PropertySource;
 import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
 
 import javax.inject.Inject;
+import java.util.Properties;
 
 @Configuration
 @PropertySource(value = {
@@ -99,6 +103,7 @@ public class ApplicationConfig {
 
 
   @Bean
+  @DependsOn("containerRegistry")
   public InputManager inputManager() {
     return new InputManagerImpl();
   }
@@ -107,4 +112,13 @@ public class ApplicationConfig {
   public OutputManager outputManager() {
     return new OutputManagerImpl();
   }
+
+  @Bean
+  public DockerContainerRegistry containerRegistry() {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      return DockerContainerRegistry.getInstance(logFeederProps.getProperties());
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index a75b2d6..9a29f86 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -130,6 +130,16 @@ public class LogFeederProps implements LogFeederProperties {
   @Value("${" + LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY + ":/usr/lib/ambari-logsearch-logfeeder/conf/checkpoints}")
   public String checkpointFolder;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY,
+    description = "",
+    examples = {"true"},
+    defaultValue = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT + "",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY + ":false}")
+  public boolean dockerContainerRegistryEnabled;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -227,6 +237,14 @@ public class LogFeederProps implements LogFeederProperties {
     this.solrImplicitRouting = solrImplicitRouting;
   }
 
+  public boolean isDockerContainerRegistryEnabled() {
+    return dockerContainerRegistryEnabled;
+  }
+
+  public void setDockerContainerRegistryEnabled(boolean dockerContainerRegistryEnabled) {
+    this.dockerContainerRegistryEnabled = dockerContainerRegistryEnabled;
+  }
+
   @PostConstruct
   public void init() {
     properties = new Properties();
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
similarity index 68%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
index b58db6a..ab13775 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.ambari.logfeeder.filter;
 
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 
-public interface InputFileDescriptor extends InputFileBaseDescriptor {
-  Integer getDetachIntervalMin();
+import java.util.Map;
 
-  Integer getDetachTimeMin();
+public class DockerLogFilter {
 
-  Integer getPathUpdateIntervalMin();
+  private DockerLogFilter() {
+  }
 
-  Integer getMaxAgeMin();
+  public static String getLogFromDockerJson(String jsonInput) {
+    Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(jsonInput);
+    return jsonMap.get("log").toString();
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 2074f93..5ed61cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -23,11 +23,14 @@ import com.google.gson.reflect.TypeToken;
 import oi.thekraken.grok.api.Grok;
 import oi.thekraken.grok.api.exception.GrokException;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
@@ -73,6 +76,8 @@ public class FilterGrok extends Filter<LogFeederProps> {
 
   private boolean skipOnError = false;
 
+  private boolean dockerEnabled = false;
+
   @Override
   public void init(LogFeederProps logFeederProps) throws Exception {
     super.init(logFeederProps);
@@ -83,6 +88,12 @@ public class FilterGrok extends Filter<LogFeederProps> {
       sourceField = getFilterDescriptor().getSourceField();
       removeSourceField = BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(), removeSourceField);
       skipOnError = ((FilterGrokDescriptor) getFilterDescriptor()).isSkipOnError();
+      if (logFeederProps.isDockerContainerRegistryEnabled()) {
+        Input input = getInput();
+        if (input != null && input instanceof InputFile) {
+          dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false);
+        }
+      }
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
       getShortDescription());
@@ -178,6 +189,9 @@ public class FilterGrok extends Filter<LogFeederProps> {
 
   @Override
   public void apply(String inputStr, InputMarker inputMarker) throws Exception {
+    if (dockerEnabled) {
+      inputStr = DockerLogFilter.getLogFromDockerJson(inputStr);
+    }
     if (grokMessage == null) {
       return;
     }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 726a237..441ce3e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -20,6 +20,9 @@ package org.apache.ambari.logfeeder.input;
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerMetadata;
+import org.apache.ambari.logfeeder.input.monitor.DockerLogFileUpdateMonitor;
 import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
 import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
@@ -29,7 +32,6 @@ import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 import org.apache.commons.lang.BooleanUtils;
@@ -81,28 +83,43 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
   private Thread thread;
   private Thread logFileDetacherThread;
   private Thread logFilePathUpdaterThread;
+  private Thread dockerLogFileUpdateMonitorThread;
   private ThreadGroup threadGroup;
 
   private boolean multiFolder = false;
+  private boolean dockerLog = false;
+  private boolean dockerLogParent = true;
+  private DockerContainerRegistry dockerContainerRegistry;
   private Map<String, List<File>> folderMap;
   private Map<String, InputFile> inputChildMap = new HashMap<>();
 
   @Override
   public boolean isReady() {
     if (!isReady) {
-      // Let's try to check whether the file is available
-      logFiles = getActualInputLogFiles();
-      Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
-      setFolderMap(foldersMap);
-      if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
-        if (tail && logFiles.length > 1) {
-          LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
-            ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+      if (dockerLog) {
+        if (dockerContainerRegistry != null) {
+          Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
+          String logType = getLogType();
+          if (metadataMap.containsKey(logType)) {
+            isReady = true;
+          }
+        } else {
+          LOG.warn("Docker registry is not set, probably docker registry usage is not enabled.");
         }
-        LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
-        isReady = true;
       } else {
-        LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+        logFiles = getActualInputLogFiles();
+        Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+        setFolderMap(foldersMap);
+        if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
+          if (tail && logFiles.length > 1) {
+            LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+              ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+          }
+          LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
+          isReady = true;
+        } else {
+          LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+        }
       }
     }
     return isReady;
@@ -150,7 +167,25 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
   @Override
   public boolean monitor() {
     if (isReady()) {
-      if (multiFolder) {
+      if (dockerLog && dockerLogParent) {
+        Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
+        String logType = getLogType();
+        threadGroup = new ThreadGroup("docker-parent-" + logType);
+        if (metadataMap.containsKey(logType)) {
+          Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(logType);
+          for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : dockerMetadataMap.entrySet()) {
+            try {
+              startNewChildDockerInputFileThread(dockerMetadataEntry.getValue());
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+          dockerLogFileUpdateMonitorThread = new Thread(new DockerLogFileUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "docker_logfiles_updater=" + logType);
+          dockerLogFileUpdateMonitorThread.setDaemon(true);
+          dockerLogFileUpdateMonitorThread.start();
+        }
+      }
+      else if (multiFolder) {
         try {
           threadGroup = new ThreadGroup(getNameForThread());
           if (getFolderMap() != null) {
@@ -181,6 +216,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
 
   @Override
   public InputFileMarker getInputMarker() {
+    // TODO: use this
     return null;
   }
 
@@ -190,10 +226,6 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
     LOG.info("init() called");
 
     checkPointExtension = logFeederProps.getCheckPointExtension();
-
-    // Let's close the file and set it to true after we start monitoring it
-    setClosed(true);
-    logPath = getInputDescriptor().getPath();
     checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
     detachIntervalMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(), DEFAULT_DETACH_INTERVAL_MIN * 60);
     detachTimeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(), DEFAULT_DETACH_TIME_MIN * 60);
@@ -201,23 +233,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
     maxAgeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(), 0);
     boolean initDefaultFields = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(), false);
     setInitDefaultFields(initDefaultFields);
-    if (StringUtils.isEmpty(logPath)) {
-      LOG.error("path is empty for file input. " + getShortDescription());
-      return;
-    }
 
-    setFilePath(logPath);
-    // Check there can have pattern in folder
-    if (getFilePath() != null && getFilePath().contains("/")) {
-      int lastIndexOfSlash = getFilePath().lastIndexOf("/");
-      String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash);
-      if (folderBeforeLogName.contains("*")) {
-        LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders.");
-        setMultiFolder(true);
+    // Let's close the file and set it to true after we start monitoring it
+    setClosed(true);
+    dockerLog = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDockerEnabled(), false);
+    if (dockerLog) {
+      if (logFeederProps.isDockerContainerRegistryEnabled()) {
+        boolean isFileReady = isReady();
+        LOG.info("Container type to monitor " + getType() + ", tail=" + tail + ", isReady=" + isFileReady);
+      } else {
+        LOG.warn("Using docker input, but docker registry usage is not enabled.");
+      }
+    } else {
+      logPath = getInputDescriptor().getPath();
+      if (StringUtils.isEmpty(logPath)) {
+        LOG.error("path is empty for file input. " + getShortDescription());
+        return;
       }
+
+      setFilePath(logPath);
+      // Check there can have pattern in folder
+      if (getFilePath() != null && getFilePath().contains("/")) {
+        int lastIndexOfSlash = getFilePath().lastIndexOf("/");
+        String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash);
+        if (folderBeforeLogName.contains("*")) {
+          LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders.");
+          setMultiFolder(true);
+        }
+      }
+      boolean isFileReady = isReady();
+      LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
     }
-    boolean isFileReady = isReady();
-    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
 
     LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
     initCache(
@@ -295,6 +341,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
     }
   }
 
+  public void startNewChildDockerInputFileThread(DockerMetadata dockerMetadata) throws CloneNotSupportedException {
+    LOG.info("Start docker child input thread - " + dockerMetadata.getLogPath());
+    InputFile clonedObject = (InputFile) this.clone();
+    clonedObject.setDockerLogParent(false);
+    clonedObject.logPath = dockerMetadata.getLogPath();
+    clonedObject.setFilePath(logPath);
+    clonedObject.logFiles = new File[]{new File(dockerMetadata.getLogPath())};
+    clonedObject.setInputChildMap(new HashMap<>());
+    clonedObject.setDockerLogFileUpdateMonitorThread(null);
+    copyFilters(clonedObject, getFirstFilter());
+    Thread thread = new Thread(threadGroup, clonedObject, "file=" + dockerMetadata.getLogPath());
+    clonedObject.setThread(thread);
+    inputChildMap.put(dockerMetadata.getLogPath(), clonedObject);
+    thread.start();
+  }
+
+  public void stopChildDockerInputFileThread(String logPathKey) {
+    LOG.info("Stop child input thread - " + logPathKey);
+    String filePath = new File(logPathKey).getName();
+    if (inputChildMap.containsKey(logPathKey)) {
+      InputFile inputFile = inputChildMap.get(logPathKey);
+      inputFile.setClosed(true);
+      if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+        inputFile.getThread().interrupt();
+      }
+      inputChildMap.remove(logPathKey);
+    } else {
+      LOG.warn(logPathKey + " not found as an input child.");
+    }
+  }
+
   public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException {
     LOG.info("Start child input thread - " + folderFileEntry.getKey());
     InputFile clonedObject = (InputFile) this.clone();
@@ -506,8 +583,35 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> {
     this.logFilePathUpdaterThread = logFilePathUpdaterThread;
   }
 
+  public Thread getDockerLogFileUpdateMonitorThread() {
+    return dockerLogFileUpdateMonitorThread;
+  }
+
+  public void setDockerLogFileUpdateMonitorThread(Thread dockerLogFileUpdateMonitorThread) {
+    this.dockerLogFileUpdateMonitorThread = dockerLogFileUpdateMonitorThread;
+  }
+
   public Integer getMaxAgeMin() {
     return maxAgeMin;
   }
 
+  public void setDockerContainerRegistry(DockerContainerRegistry dockerContainerRegistry) {
+    this.dockerContainerRegistry = dockerContainerRegistry;
+  }
+
+  public DockerContainerRegistry getDockerContainerRegistry() {
+    return dockerContainerRegistry;
+  }
+
+  public boolean isDockerLog() {
+    return dockerLog;
+  }
+
+  public boolean isDockerLogParent() {
+    return dockerLogParent;
+  }
+
+  public void setDockerLogParent(boolean dockerLogParent) {
+    this.dockerLogParent = dockerLogParent;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 40475c6..ea97968 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -20,6 +20,8 @@ package org.apache.ambari.logfeeder.input;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor;
 import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
@@ -65,6 +67,9 @@ public class InputManagerImpl extends InputManager {
   private Thread inputIsReadyMonitor;
 
   @Inject
+  private DockerContainerRegistry dockerContainerRegistry;
+
+  @Inject
   private LogFeederProps logFeederProps;
 
   public List<Input> getInputList(String serviceName) {
@@ -127,6 +132,7 @@ public class InputManagerImpl extends InputManager {
   public void init() throws Exception {
     initCheckPointSettings();
     startMonitorThread();
+    startDockerMetadataThread();
   }
 
   private void initCheckPointSettings() {
@@ -162,6 +168,13 @@ public class InputManagerImpl extends InputManager {
     }
   }
 
+  private void startDockerMetadataThread() {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      Thread obtaiinDockerMetadataThread = new Thread(new DockerContainerRegistryMonitor(dockerContainerRegistry), "obtain_docker_metadata");
+      obtaiinDockerMetadataThread.start();
+    }
+  }
+
   private void startMonitorThread() {
     inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
       @Override
@@ -199,6 +212,10 @@ public class InputManagerImpl extends InputManager {
   public void startInputs(String serviceName) {
     for (Input input : inputs.get(serviceName)) {
       try {
+        if (input instanceof InputFile) {// apply docker metadata registry
+          InputFile inputFile = (InputFile)  input;
+          inputFile.setDockerContainerRegistry(dockerContainerRegistry);
+        }
         input.init(logFeederProps);
         if (input.isReady()) {
           input.monitor();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java
new file mode 100644
index 0000000..0275827
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.docker.DockerContainerRegistry;
+import org.apache.ambari.logfeeder.docker.DockerMetadata;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Periodically check docker containers metadata registry, stop monitoring container log files if those do not exist or stopped too long time ago.
+ * If it finds a new container log for the specific type, it will start to monitoring it.
+ * <br/>
+ * Use cases:<br/>
+ * - input has not monitored yet - found new container -> start monitoring it <br/>
+ * - input has not monitored yet - found new stopped container -> start monitoring it <br/>
+ * - input has not monitored yet - found new stopped container but log is too old -> do not monitoring it <br/>
+ * - input has monitored already - container stopped - if it's stopped for too long time -> remove it from the monitoed list<br/>
+ * - input has monitored already - container stopped - log is not too old -> keep in the monitored list <br/>
+ * - input has monitored already - container does not exist - remove it from the monitoed list (and all other input with the same log type) <br/>
+ */
+public class DockerLogFileUpdateMonitor extends AbstractLogFileMonitor {
+
+  private Logger LOG = LoggerFactory.getLogger(DockerLogFileUpdateMonitor.class);
+
+  public DockerLogFileUpdateMonitor(InputFile inputFile, int waitInterval, int detachTime) {
+    super(inputFile, waitInterval, detachTime);
+  }
+
+  @Override
+  protected String getStartLog() {
+    return "Start docker component type log files monitor thread for " + getInputFile().getLogType();
+  }
+
+  @Override
+  protected void monitorAndUpdate() throws Exception {
+    DockerContainerRegistry dockerContainerRegistry = getInputFile().getDockerContainerRegistry();
+    Map<String, Map<String, DockerMetadata>> dockerMetadataMapByType = dockerContainerRegistry.getContainerMetadataMap();
+    String logType = getInputFile().getLogType();
+    Map<String, InputFile> copiedChildMap = new HashMap<>(getInputFile().getInputChildMap());
+
+    if (dockerMetadataMapByType.containsKey(logType)) {
+      Map<String, DockerMetadata> dockerMetadataMap = dockerMetadataMapByType.get(logType);
+      for (Map.Entry<String, DockerMetadata> containerEntry : dockerMetadataMap.entrySet()) {
+        String logPath = containerEntry.getValue().getLogPath();
+        String containerId = containerEntry.getValue().getId();
+        long timestamp = containerEntry.getValue().getTimestamp();
+        boolean running = containerEntry.getValue().isRunning();
+        LOG.debug("Found log path: {} (container id: {})", logPath, containerId);
+        if (!copiedChildMap.containsKey(logPath)) {
+          if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) {
+            LOG.debug("Container with id {} is stopped, won't monitor as it stopped for long time.", containerId);
+          } else {
+            LOG.info("Found new container (id: {}) with new log path: {}", logPath, containerId);
+            getInputFile().startNewChildDockerInputFileThread(containerEntry.getValue());
+          }
+        } else {
+          if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) {
+            LOG.info("Removing: {}", logPath);
+            getInputFile().stopChildDockerInputFileThread(containerEntry.getKey());
+          }
+        }
+      }
+    } else {
+      if (!copiedChildMap.isEmpty()) {
+        LOG.info("Removing all inputs with type: {}", logType);
+        for (Map.Entry<String, InputFile> inputFileEntry : copiedChildMap.entrySet()) {
+          LOG.info("Removing: {}", inputFileEntry.getKey());
+          getInputFile().stopChildDockerInputFileThread(inputFileEntry.getKey());
+        }
+      }
+    }
+  }
+
+  private boolean isItTooOld(long timestamp, long actualTimestamp, long maxDiffMinutes) {
+    long diff = actualTimestamp - timestamp;
+    long maxDiffMins = maxDiffMinutes * 1000 * 60;
+    return diff > maxDiffMins;
+  }
+}
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
index efa56a2..012455e 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
@@ -40,6 +40,9 @@ public class LSServerInputFile extends LSServerInputFileBase {
   @JsonProperty("max_age_min")
   private Integer maxAgeMin;
 
+  @JsonProperty("docker")
+  private Boolean dockerEnabled;
+
   public LSServerInputFile() {}
 
   public LSServerInputFile(InputDescriptor inputDescriptor) {
@@ -49,6 +52,7 @@ public class LSServerInputFile extends LSServerInputFileBase {
     this.detachTimeMin = inputFileDescriptor.getDetachTimeMin();
     this.pathUpdateIntervalMin = inputFileDescriptor.getPathUpdateIntervalMin();
     this.maxAgeMin = inputFileDescriptor.getMaxAgeMin();
+    this.dockerEnabled = inputFileDescriptor.getDockerEnabled();
   }
 
   public Integer getDetachIntervalMin() {
@@ -82,4 +86,12 @@ public class LSServerInputFile extends LSServerInputFileBase {
   public void setMaxAgeMin(Integer maxAgeMin) {
     this.maxAgeMin = maxAgeMin;
   }
+
+  public Boolean getDockerEnabled() {
+    return dockerEnabled;
+  }
+
+  public void setDockerEnabled(Boolean dockerEnabled) {
+    this.dockerEnabled = dockerEnabled;
+  }
 }
diff --git a/ambari-logsearch/docker/docker-compose.yml b/ambari-logsearch/docker/docker-compose.yml
index 99e0f18..b73ee5c 100644
--- a/ambari-logsearch/docker/docker-compose.yml
+++ b/ambari-logsearch/docker/docker-compose.yml
@@ -47,6 +47,8 @@ services:
     image: ambari-logsearch:v1.0
     restart: always
     hostname: logsearch.apache.org
+    labels:
+      logfeeder.log.type: "logsearch_server"
     networks:
       - logsearch-network
     env_file:
@@ -68,6 +70,9 @@ services:
     image: ambari-logsearch:v1.0
     restart: always
     hostname: logfeeder.apache.org
+    privileged: true
+    labels:
+      logfeeder.log.type: "logfeeder"
     networks:
       - logsearch-network
     env_file:
@@ -82,6 +87,9 @@ services:
       - $AMBARI_LOCATION:/root/ambari
       - $AMBARI_LOCATION/ambari-logsearch/docker/test-logs:/root/test-logs
       - $AMBARI_LOCATION/ambari-logsearch/docker/test-config:/root/test-config
+      - /var/run/docker.sock:/var/run/docker.sock
+      - /usr/local/bin/docker:/usr/local/bin/docker
+      - /var/lib/docker:/var/lib/docker
 networks:
    logsearch-network:
       driver: bridge
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 16cbb0f..850aca2 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -29,4 +29,5 @@ logfeeder.cache.key.field=log_message
 logfeeder.cache.dedup.interval=1000
 logfeeder.cache.last.dedup.enabled=true
 logsearch.config.zk_connect_string=localhost:9983
-logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
\ No newline at end of file
+logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
+logfeeder.docker.registry.enabled=true
\ No newline at end of file
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json
new file mode 100644
index 0000000..a420960
--- /dev/null
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json
@@ -0,0 +1,31 @@
+{
+  "input": [
+    {
+      "type": "logsearch_server",
+      "rowtype": "service",
+      "docker": "true"
+    }
+  ],
+  "filter": [
+    {
+      "filter": "grok",
+      "conditions": {
+        "fields": {
+          "type": [
+            "logsearch_server"
+          ]
+        }
+      },
+      "log4j_format": "",
+      "multiline_pattern": "^(%{DATESTAMP:logtime})",
+      "message_pattern": "(?m)^%{DATESTAMP:logtime}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{JAVACLASS}%{SPACE}\\(%{JAVAFILE:file}:%{INT:line_number}\\)%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values": {
+        "logtime": {
+          "map_date": {
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+          }
+        }
+      }
+    }
+  ]
+}
diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml
index 5fbbb33..6d5f9f1 100644
--- a/ambari-logsearch/pom.xml
+++ b/ambari-logsearch/pom.xml
@@ -33,6 +33,7 @@
     <module>ambari-logsearch-config-zookeeper</module>
     <module>ambari-logsearch-it</module>
     <module>ambari-logsearch-logfeeder-plugin-api</module>
+    <module>ambari-logsearch-logfeeder-container-registry</module>
   </modules>
   <properties>
     <jdk.version>1.8</jdk.version>