You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/06/19 10:36:41 UTC

ambari git commit: AMBARI-21181 Ability to anonymize data during log processing (mgergely)

Repository: ambari
Updated Branches:
  refs/heads/trunk b7d422526 -> 49da047ec


AMBARI-21181 Ability to anonymize data during log processing (mgergely)

Change-Id: I93c62828a945567d75b638c475046cb77ad3f5e0


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/49da047e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/49da047e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/49da047e

Branch: refs/heads/trunk
Commit: 49da047ecd0effd228c413425c73432f40073ac3
Parents: b7d4225
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Mon Jun 19 12:36:31 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Mon Jun 19 12:36:31 2017 +0200

----------------------------------------------------------------------
 .../inputconfig/MapAnonymizeDescriptor.java     |  26 ++++
 .../model/inputconfig/MapDateDescriptor.java    |   2 +-
 .../model/inputconfig/MapFieldDescriptor.java   |   2 +-
 .../inputconfig/MapFieldValueDescriptor.java    |   2 +-
 .../impl/MapAnonymizeDescriptorImpl.java        |  57 +++++++++
 .../inputconfig/impl/PostMapValuesAdapter.java  |   3 +
 .../docs/postMapValues.md                       |   9 ++
 .../logfeeder/input/AbstractInputFile.java      | 116 +++++++++---------
 .../apache/ambari/logfeeder/input/Input.java    |   5 +-
 .../ambari/logfeeder/input/InputManager.java    |   5 +
 .../ambari/logfeeder/input/InputS3File.java     |   6 +
 .../logfeeder/mapper/MapperAnonymize.java       | 120 +++++++++++++++++++
 .../apache/ambari/logfeeder/util/FileUtil.java  |   7 ++
 .../src/main/resources/alias_config.json        |   3 +
 .../logfeeder/mapper/MapperAnonymizeTest.java   |  79 ++++++++++++
 .../src/test/resources/log4j.xml                |   2 +-
 .../logsearch/model/common/LSServerMapDate.java |   3 -
 .../model/common/LSServerMapField.java          |   3 +
 .../model/common/LSServerMapFieldAnonymize.java |  65 ++++++++++
 .../LSServerPostMapValuesListDeserializer.java  |   4 +
 .../shipper-conf/input.config-hst.json          |   4 +-
 .../shipper-conf/input.config-zookeeper.json    |   4 +-
 22 files changed, 458 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java
new file mode 100644
index 0000000..2533155
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapAnonymizeDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapAnonymizeDescriptor extends MapFieldDescriptor {
+  String getPattern();
+
+  Character getHideChar();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
index f88435f..985d221 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
@@ -22,5 +22,5 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig;
 public interface MapDateDescriptor extends MapFieldDescriptor {
   String getSourceDatePattern();
 
-  public String getTargetDatePattern();
+  String getTargetDatePattern();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
index db086c5..f5a2e35 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
@@ -20,5 +20,5 @@
 package org.apache.ambari.logsearch.config.api.model.inputconfig;
 
 public interface MapFieldDescriptor {
-  public String getJsonName();
+  String getJsonName();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
index cf37e62..f039958 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
@@ -22,5 +22,5 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig;
 public interface MapFieldValueDescriptor extends MapFieldDescriptor {
   String getPreValue();
 
-  public String getPostValue();
+  String getPostValue();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java
new file mode 100644
index 0000000..5fdbbab
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapAnonymizeDescriptorImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapAnonymizeDescriptorImpl implements MapAnonymizeDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_anonymize";
+  }
+
+  @Expose
+  private String pattern;
+
+  @Expose
+  @SerializedName("hide_char")
+  private Character hideChar;
+
+  @Override
+  public String getPattern() {
+    return pattern;
+  }
+
+  public void setPattern(String pattern) {
+    this.pattern = pattern;
+  }
+
+  @Override
+  public Character getHideChar() {
+    return hideChar;
+  }
+
+  public void setHideChar(Character hideChar) {
+    this.hideChar = hideChar;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
index 32aded8..3c21fd8 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
@@ -66,6 +66,9 @@ public class PostMapValuesAdapter implements JsonDeserializer<List<PostMapValues
         case "map_fieldvalue":
           mappers.add((MapFieldValueDescriptorImpl)context.deserialize(m.getValue(), MapFieldValueDescriptorImpl.class));
           break;
+        case "map_anonymize":
+          mappers.add((MapAnonymizeDescriptorImpl)context.deserialize(m.getValue(), MapAnonymizeDescriptorImpl.class));
+          break;
         default:
           System.out.println("Unknown key: " + m.getKey());
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/docs/postMapValues.md
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/docs/postMapValues.md b/ambari-logsearch/ambari-logsearch-logfeeder/docs/postMapValues.md
index 906359a..7ec439a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/docs/postMapValues.md
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/docs/postMapValues.md
@@ -59,3 +59,12 @@ The name of the mapping element should be map\_fieldvalue. The value json elemen
 |-------------|--------------------------------------------------------------------|
 | pre\_value  | The value that the field must match \(ignoring case\) to be mapped |
 | post\_value | The value to which the field is modified to                        |
+
+## Map Anonymize
+
+The name of the mapping element should be map\_anonymize. The value json element should contain the following parameter:
+
+| Field      | Description                                                                                                     |
+|------------|-----------------------------------------------------------------------------------------------------------------|
+| pattern    | The pattern to use to identify parts to anonymize. The parts to hide should be marked with the "<hide>" string. |
+| hide\_char | The character to hide with, if it is not specified then the default is 'X'                                      |

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 2bc4e68..ab50eb7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -28,6 +28,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
 import org.apache.commons.lang.BooleanUtils;
@@ -48,7 +49,6 @@ public abstract class AbstractInputFile extends Input {
 
   private String checkPointExtension;
   private File checkPointFile;
-  private RandomAccessFile checkPointWriter;
   private long lastCheckPointTimeMS;
   private int checkPointIntervalMS;
   private Map<String, Object> jsonCheckPoint;
@@ -93,7 +93,6 @@ public abstract class AbstractInputFile extends Input {
     LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
     BufferedReader br = null;
     checkPointFile = null;
-    checkPointWriter = null;
     jsonCheckPoint = null;
 
     int lineCount = 0;
@@ -105,6 +104,7 @@ public abstract class AbstractInputFile extends Input {
       boolean resume = true;
       int resumeFromLineNumber = getResumeFromLineNumber();
       if (resumeFromLineNumber > 0) {
+        LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber);
         resume = false;
       }
       
@@ -211,27 +211,29 @@ public abstract class AbstractInputFile extends Input {
         String checkPointFileName = base64FileKey + checkPointExtension;
         File checkPointFolder = inputManager.getCheckPointFolderFile();
         checkPointFile = new File(checkPointFolder, checkPointFileName);
-        checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
-
-        try {
-          int contentSize = checkPointWriter.readInt();
-          byte b[] = new byte[contentSize];
-          int readSize = checkPointWriter.read(b, 0, contentSize);
-          if (readSize != contentSize) {
-            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
-                readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
-          } else {
-            String jsonCheckPointStr = new String(b, 0, readSize);
-            jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
-            resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
-
-            LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
-                ", resumeFromLineNumber=" + resumeFromLineNumber);
+        if (!checkPointFile.exists()) {
+          LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
+        } else {
+          try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
+            int contentSize = checkPointWriter.readInt();
+            byte b[] = new byte[contentSize];
+            int readSize = checkPointWriter.read(b, 0, contentSize);
+            if (readSize != contentSize) {
+              LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+                  readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
+            } else {
+              String jsonCheckPointStr = new String(b, 0, readSize);
+              jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+              resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+
+              LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+                  ", resumeFromLineNumber=" + resumeFromLineNumber);
+           }
+          } catch (EOFException eofEx) {
+            LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+                getShortDescription(), eofEx);
           }
-        } catch (EOFException eofEx) {
-          LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
-              getShortDescription());
         }
         if (jsonCheckPoint == null) {
           // This seems to be first time, so creating the initial checkPoint object
@@ -250,43 +252,48 @@ public abstract class AbstractInputFile extends Input {
 
   @Override
   public synchronized void checkIn(InputMarker inputMarker) {
-    if (checkPointWriter != null) {
-      try {
-        int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
-        if (lineNumber > inputMarker.lineNumber) {
-          // Already wrote higher line number for this input
-          return;
-        }
-        // If interval is greater than last checkPoint time, then write
-        long currMS = System.currentTimeMillis();
-        if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
-          // Let's save this one so we can update the check point file on flush
-          lastCheckPointInputMarker = inputMarker;
-          return;
-        }
-        lastCheckPointTimeMS = currMS;
+    try {
+      int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+      if (lineNumber > inputMarker.lineNumber) {
+        // Already wrote higher line number for this input
+        return;
+      }
+      // If interval is greater than last checkPoint time, then write
+      long currMS = System.currentTimeMillis();
+      if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+        // Let's save this one so we can update the check point file on flush
+        lastCheckPointInputMarker = inputMarker;
+        return;
+      }
+      lastCheckPointTimeMS = currMS;
 
-        jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
-        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
-        jsonCheckPoint.put("last_write_time_date", new Date());
+      jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
+      jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+      jsonCheckPoint.put("last_write_time_date", new Date());
 
-        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+      String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
 
-        // Let's rewind
-        checkPointWriter.seek(0);
-        checkPointWriter.writeInt(jsonStr.length());
-        checkPointWriter.write(jsonStr.getBytes());
+      File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp");
+      if (tmpCheckPointFile.exists()) {
+        tmpCheckPointFile.delete();
+      }
+      RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws");
+      tmpRaf.writeInt(jsonStr.length());
+      tmpRaf.write(jsonStr.getBytes());
+      tmpRaf.getFD().sync();
+      tmpRaf.close();
+      
+      FileUtil.move(tmpCheckPointFile, checkPointFile);
 
-        if (isClosed()) {
-          String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
-          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
-              ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
-        }
-      } catch (Throwable t) {
-        String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
-        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
-            LOG, Level.ERROR);
+      if (isClosed()) {
+        String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
+        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
+            ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
       }
+    } catch (Throwable t) {
+      String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
+          LOG, Level.ERROR);
     }
   }
 
@@ -302,6 +309,7 @@ public abstract class AbstractInputFile extends Input {
     super.close();
     LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
     lastCheckIn();
+    isClosed = true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index fba596d..27d16c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -59,7 +59,7 @@ public abstract class Input extends ConfigItem implements Runnable {
   private String type;
   protected String filePath;
   private Filter firstFilter;
-  private boolean isClosed;
+  protected boolean isClosed;
 
   protected boolean tail;
   private boolean useEventMD5;
@@ -237,13 +237,10 @@ public abstract class Input extends ConfigItem implements Runnable {
     try {
       if (firstFilter != null) {
         firstFilter.close();
-      } else {
-        outputManager.close();
       }
     } catch (Throwable t) {
       // Ignore
     }
-    isClosed = true;
   }
 
   private void initCache() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 8c76785..19894ae 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -77,6 +77,11 @@ public class InputManager {
     for (Input input : inputList) {
       input.setDrain(true);
     }
+    for (Input input : inputList) {
+      while (!input.isClosed()) {
+        try { Thread.sleep(100); } catch (InterruptedException e) {}
+      }
+    }
     inputList.clear();
     inputs.remove(serviceName);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 4bf162b..2b19503 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -92,4 +92,10 @@ public class InputS3File extends AbstractInputFile {
   protected Object getFileKey(File logFile) {
     return logFile.getPath();
   }
+  
+  @Override
+  public void close() {
+    super.close();
+    isClosed = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
new file mode 100644
index 0000000..c85ad49
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.commons.lang.CharUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Splitter;
+
+public class MapperAnonymize extends Mapper {
+  private static final Logger LOG = Logger.getLogger(MapperAnonymize.class);
+  
+  private static final char DEFAULT_HIDE_CHAR = '*';
+
+  private String pattern;
+  private Iterable<String> patternParts;
+  private char hideChar;
+
+  @Override
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
+    init(inputDesc, fieldName, mapClassCode);
+    
+    pattern = ((MapAnonymizeDescriptor)mapFieldDescriptor).getPattern();
+    if (StringUtils.isEmpty(pattern)) {
+      LOG.fatal("pattern is empty.");
+      return false;
+    }
+    
+    patternParts = Splitter.on("<hide>").omitEmptyStrings().split(pattern);
+    hideChar = CharUtils.toChar(((MapAnonymizeDescriptor)mapFieldDescriptor).getHideChar(), DEFAULT_HIDE_CHAR);
+    
+    return true;
+  }
+
+  @Override
+  public Object apply(Map<String, Object> jsonObj, Object value) {
+    if (value != null) {
+      try {
+        hide((String)value, jsonObj);
+      } catch (Throwable t) {
+        LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying anonymization." +
+            " pattern=" + pattern + ", hideChar=" + hideChar, t, LOG, Level.ERROR);
+      }
+    }
+    return value;
+  }
+  
+  private void hide(String value, Map<String, Object> jsonObj) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    String rest = value;
+    for (String patternPart : patternParts) {
+      int pos = rest.indexOf(patternPart);
+      if (pos == -1) {
+        return;
+      }
+      
+      int end = pos + patternPart.length();
+      if (first) {
+        if (pattern.startsWith("<hide>")) {
+          String beginning = rest.substring(0, pos);
+          int spacePos = beginning.lastIndexOf(" ");
+          if (spacePos == -1) {
+            sb.append(StringUtils.repeat(hideChar, beginning.length()));
+          } else {
+            sb.append(beginning.substring(0, spacePos+1));
+            sb.append(StringUtils.repeat(hideChar, beginning.length() - spacePos - 1));
+          }
+          sb.append(rest.substring(pos, end));
+        } else {
+          sb.append(rest.substring(0, end));
+        }
+        first = false;
+      } else {
+        sb.append(StringUtils.repeat(hideChar, pos));
+        sb.append(rest.substring(pos, end));
+      }
+      rest = rest.substring(end);
+    }
+    
+    if (pattern.endsWith("<hide>")) {
+      int spacePos = rest.indexOf(" ");
+      if (spacePos == -1) {
+        sb.append(StringUtils.repeat(hideChar, rest.length()));
+        rest = "";
+      } else {
+        sb.append(StringUtils.repeat(hideChar, spacePos));
+        rest = rest.substring(spacePos);
+      }
+    }
+    
+    sb.append(rest);
+    
+    jsonObj.put(fieldName, sb.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index ffd6cec..94d6558 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -89,4 +90,10 @@ public class FileUtil {
     }
     return new HashMap<String, Object>();
   }
+  
+  public static void move(File source, File target) throws IOException {
+    Path sourcePath = Paths.get(source.getAbsolutePath());
+    Path targetPath = Paths.get(target.getAbsolutePath());
+    Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index e2ed625..4656b5b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -33,6 +33,9 @@
     },
     "map_fieldvalue": {
       "klass": "org.apache.ambari.logfeeder.mapper.MapperFieldValue"
+    },
+    "map_anonymize": {
+      "klass": "org.apache.ambari.logfeeder.mapper.MapperAnonymize"
     }
   },
   "output": {

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java
new file mode 100644
index 0000000..a0b96c0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperAnonymizeTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapAnonymizeDescriptorImpl;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperAnonymizeTest {
+  private static final Logger LOG = Logger.getLogger(MapperAnonymizeTest.class);
+
+  @Test
+  public void testMapperAnonymize_anonymize() {
+    LOG.info("testMapperAnonymize_anonymize()");
+
+    MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl();
+    mapAnonymizeDescriptorImpl.setPattern("secret <hide> / <hide> is here");
+
+    MapperAnonymize mapperAnonymize = new MapperAnonymize();
+    assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    mapperAnonymize.apply(jsonObj, "something else secret SECRET1 / SECRET2 is here something else 2");
+
+    assertEquals("Field wasnt anonymized", "something else secret XXXXXXX / XXXXXXX is here something else 2", jsonObj.remove("someField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperAnonymize_anonymize2() {
+    LOG.info("testMapperAnonymize_anonymize2()");
+
+    MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl();
+    mapAnonymizeDescriptorImpl.setPattern("<hide> / <hide> is the secret");
+    mapAnonymizeDescriptorImpl.setHideChar('*');
+
+    MapperAnonymize mapperAnonymize = new MapperAnonymize();
+    assertTrue("Could not initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl));
+
+    Map<String, Object> jsonObj = new HashMap<>();
+    mapperAnonymize.apply(jsonObj, "something else SECRET1 / SECRET2 is the secret something else 2");
+
+    assertEquals("Field wasnt anonymized", "something else ******* / ******* is the secret something else 2", jsonObj.remove("someField"));
+    assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+  }
+
+  @Test
+  public void testMapperAnonymize_noPattern() {
+    LOG.info("testMapperAnonymize_noPattern()");
+
+    MapAnonymizeDescriptorImpl mapAnonymizeDescriptorImpl = new MapAnonymizeDescriptorImpl();
+
+    MapperAnonymize mapperAnonymize = new MapperAnonymize();
+    assertFalse("Was not able to initialize!", mapperAnonymize.init(null, "someField", null, mapAnonymizeDescriptorImpl));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
index e641018..1d28fcc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
@@ -15,7 +15,7 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<!DOCTYPE log4j:configuration SYSTEM "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
   <appender name="console" class="org.apache.log4j.ConsoleAppender">

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
index dcb0393..96e0287 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
@@ -23,11 +23,8 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
 
-@JsonInclude(Include.NON_NULL)
 public class LSServerMapDate extends LSServerMapField {
   @Override
   public String getName() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
index b18439c..df33da1 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
@@ -20,11 +20,14 @@
 package org.apache.ambari.logsearch.model.common;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 
 import io.swagger.annotations.ApiModel;
 
 @ApiModel
 @JsonIgnoreProperties(value = { "name" })
+@JsonInclude(Include.NON_NULL)
 public abstract class LSServerMapField {
   public abstract String getName();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java
new file mode 100644
index 0000000..9fb589e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldAnonymize.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerMapFieldAnonymize extends LSServerMapField {
+  @Override
+  public String getName() {
+    return "map_anonymize";
+  }
+
+  @NotNull
+  private String pattern;
+
+  @JsonProperty("hide_char")
+  private Character hideChar;
+  
+  public LSServerMapFieldAnonymize() {}
+
+  public LSServerMapFieldAnonymize(MapAnonymizeDescriptor mapAnonymizeDescriptor) {
+    this.pattern = mapAnonymizeDescriptor.getPattern();
+    this.hideChar = mapAnonymizeDescriptor.getHideChar();
+  }
+
+  public String getPattern() {
+    return pattern;
+  }
+
+  public void setPattern(String pattern) {
+    this.pattern = pattern;
+  }
+
+  public Character getHideChar() {
+    return hideChar;
+  }
+
+  public void setHideChar(Character hideChar) {
+    this.hideChar = hideChar;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java
index 18744e2..258f64a 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesListDeserializer.java
@@ -64,6 +64,10 @@ public class LSServerPostMapValuesListDeserializer extends JsonDeserializer<LSSe
             LSServerMapFieldCopy mapFieldCopy = oc.treeToValue((TreeNode)mapperProperties, LSServerMapFieldCopy.class);
             mappers.add(mapFieldCopy);
             break;
+          case "map_anonymize" :
+            LSServerMapFieldAnonymize mapAnonyimize = oc.treeToValue((TreeNode)mapperProperties, LSServerMapFieldAnonymize.class);
+            mappers.add(mapAnonyimize);
+            break;
         }
       }
       

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-hst.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-hst.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-hst.json
index 4ccef74..2ef13a2 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-hst.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-hst.json
@@ -22,7 +22,7 @@
       "post_map_values": {
         "logtime": {
           "map_date": {
-            "date_pattern":"dd MMM yyyy HH:mm:ss"
+            "target_date_pattern":"dd MMM yyyy HH:mm:ss"
           }
         },
         "level": {
@@ -34,4 +34,4 @@
       }
     }
   ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/49da047e/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
index d3685a4..fd1af97 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
@@ -25,10 +25,10 @@
       "post_map_values": {
         "logtime": {
           "map_date": {
-            "date_pattern": "yyyy-MM-dd HH:mm:ss,SSS"
+            "target_date_pattern": "yyyy-MM-dd HH:mm:ss,SSS"
           }
         }
       }
     }
   ]
-}
\ No newline at end of file
+}