You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/05/22 10:50:13 UTC

[1/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely)

Repository: ambari
Updated Branches:
  refs/heads/trunk cbb1e9059 -> fd4a7a46a


http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
new file mode 100644
index 0000000..5c547ad
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerInputFile extends LSServerInputFileBase {
+  public LSServerInputFile(InputDescriptor inputDescriptor) {
+    super(inputDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFileBase.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFileBase.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFileBase.java
new file mode 100644
index 0000000..df21d0d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFileBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public abstract class LSServerInputFileBase extends LSServerInput {
+  @JsonProperty("checkpoint_interval_ms")
+  private Integer checkpointIntervalMs;
+
+  @JsonProperty("process_file")
+  private Boolean processFile;
+
+  @JsonProperty("copy_file")
+  private Boolean copyFile;
+  
+  public LSServerInputFileBase(InputDescriptor inputDescriptor) {
+    super(inputDescriptor);
+    
+    InputFileBaseDescriptor inputFileBaseDescriptor = (InputFileBaseDescriptor)inputDescriptor;
+    this.checkpointIntervalMs = inputFileBaseDescriptor.getCheckpointIntervalMs();
+    this.processFile = inputFileBaseDescriptor.getProcessFile();
+    this.copyFile = inputFileBaseDescriptor.getCopyFile();
+  }
+
+  public Integer getCheckpointIntervalMs() {
+    return checkpointIntervalMs;
+  }
+
+  public void setCheckpointIntervalMs(Integer checkpointIntervalMs) {
+    this.checkpointIntervalMs = checkpointIntervalMs;
+  }
+
+  public Boolean getProcessFile() {
+    return processFile;
+  }
+
+  public void setProcessFile(Boolean processFile) {
+    this.processFile = processFile;
+  }
+
+  public Boolean getCopyFile() {
+    return copyFile;
+  }
+
+  public void setCopyFile(Boolean copyFile) {
+    this.copyFile = copyFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
new file mode 100644
index 0000000..8e9acf0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerInputS3File extends LSServerInputFileBase {
+  @JsonProperty("s3_access_key")
+  private String s3AccessKey;
+  
+  @JsonProperty("s3_secret_key")
+  private String s3SecretKey;
+  
+  public LSServerInputS3File(InputDescriptor inputDescriptor) {
+    super(inputDescriptor);
+    InputS3FileDescriptor inputS3FileDescriptor = (InputS3FileDescriptor)inputDescriptor;
+    this.s3AccessKey = inputS3FileDescriptor.getS3AccessKey();
+    this.s3SecretKey = inputS3FileDescriptor.getS3SecretKey();
+  }
+
+  public String getS3AccessKey() {
+    return s3AccessKey;
+  }
+
+  public void setS3AccessKey(String s3AccessKey) {
+    this.s3AccessKey = s3AccessKey;
+  }
+
+  public String getS3SecretKey() {
+    return s3SecretKey;
+  }
+
+  public void setS3SecretKey(String s3SecretKey) {
+    this.s3SecretKey = s3SecretKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
new file mode 100644
index 0000000..dcacceb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapDate.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
+@JsonInclude(Include.NON_NULL)
+public class LSServerMapDate extends LSServerMapField {
+  @Override
+  public String getName() {
+    return "map_date";
+  }
+
+  @JsonProperty("source_date_pattern")
+  private String sourceDatePattern;
+
+  @JsonProperty("target_date_pattern")
+  private String targetDatePattern;
+
+  public LSServerMapDate(MapDateDescriptor mapDateDescriptor) {
+    this.sourceDatePattern = mapDateDescriptor.getSourceDatePattern();
+    this.targetDatePattern = mapDateDescriptor.getTargetDatePattern();
+  }
+
+  public String getSourceDatePattern() {
+    return sourceDatePattern;
+  }
+
+  public void setSourceDatePattern(String sourceDatePattern) {
+    this.sourceDatePattern = sourceDatePattern;
+  }
+
+  public String getTargetDatePattern() {
+    return targetDatePattern;
+  }
+
+  public void setTargetDatePattern(String targetDatePattern) {
+    this.targetDatePattern = targetDatePattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
new file mode 100644
index 0000000..b18439c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapField.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+@JsonIgnoreProperties(value = { "name" })
+public abstract class LSServerMapField {
+  public abstract String getName();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldCopy.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldCopy.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldCopy.java
new file mode 100644
index 0000000..b0bea83
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldCopy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerMapFieldCopy extends LSServerMapField {
+  @Override
+  public String getName() {
+    return "map_fieldcopy";
+  }
+
+  @JsonProperty("copy_name")
+  private String copyName;
+
+  public LSServerMapFieldCopy(MapFieldCopyDescriptor mapFieldCopyDescriptor) {
+    this.copyName = mapFieldCopyDescriptor.getCopyName();
+  }
+
+  public String getCopyName() {
+    return copyName;
+  }
+
+  public void setCopyName(String copyName) {
+    this.copyName = copyName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldName.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldName.java
new file mode 100644
index 0000000..000b29d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldName.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerMapFieldName extends LSServerMapField {
+  @Override
+  public String getName() {
+    return "map_fieldname";
+  }
+
+  @JsonProperty("new_field_name")
+  private String newFieldName;
+
+  public LSServerMapFieldName(MapFieldNameDescriptor mapFieldNameDescriptor) {
+    this.newFieldName = mapFieldNameDescriptor.getNewFieldName();
+  }
+
+  public String getNewFieldName() {
+    return newFieldName;
+  }
+
+  public void setNewFieldName(String newFieldName) {
+    this.newFieldName = newFieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldValue.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldValue.java
new file mode 100644
index 0000000..6152de5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerMapFieldValue.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerMapFieldValue extends LSServerMapField {
+  @Override
+  public String getName() {
+    return "map_fieldvalue";
+  }
+
+  @JsonProperty("pre_value")
+  private String preValue;
+
+  @JsonProperty("post_value")
+  private String postValue;
+
+  public LSServerMapFieldValue(MapFieldValueDescriptor mapFieldValueDescriptor) {
+    this.preValue = mapFieldValueDescriptor.getPreValue();
+    this.postValue = mapFieldValueDescriptor.getPostValue();
+  }
+
+  public String getPreValue() {
+    return preValue;
+  }
+
+  public void setPreValue(String preValue) {
+    this.preValue = preValue;
+  }
+
+  public String getPostValue() {
+    return postValue;
+  }
+
+  public void setPostValue(String postValue) {
+    this.postValue = postValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValues.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValues.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValues.java
new file mode 100644
index 0000000..5f361c9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValues.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+@JsonSerialize(using = LSServerPostMapValuesSerializer.class)
+public class LSServerPostMapValues {
+  private List<LSServerMapField> mappers;
+  
+  public LSServerPostMapValues(PostMapValues pmv) {
+    mappers = new ArrayList<>();
+    for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
+      if (mapFieldDescriptor instanceof MapDateDescriptor) {
+        mappers.add(new LSServerMapDate((MapDateDescriptor)mapFieldDescriptor));
+      } else if (mapFieldDescriptor instanceof MapFieldCopyDescriptor) {
+        mappers.add(new LSServerMapFieldCopy((MapFieldCopyDescriptor)mapFieldDescriptor));
+      } else if (mapFieldDescriptor instanceof MapFieldNameDescriptor) {
+        mappers.add(new LSServerMapFieldName((MapFieldNameDescriptor)mapFieldDescriptor));
+      } else if (mapFieldDescriptor instanceof MapFieldValueDescriptor) {
+        mappers.add(new LSServerMapFieldValue((MapFieldValueDescriptor)mapFieldDescriptor));
+      }
+    }
+  }
+
+  public List<LSServerMapField> getMappers() {
+    return mappers;
+  }
+
+  public void setMappers(List<LSServerMapField> mappers) {
+    this.mappers = mappers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesSerializer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesSerializer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesSerializer.java
new file mode 100644
index 0000000..7543677
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerPostMapValuesSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+public class LSServerPostMapValuesSerializer extends JsonSerializer<LSServerPostMapValues> {
+  @Override
+  public void serialize(LSServerPostMapValues value, JsonGenerator jgen, SerializerProvider provider)
+      throws IOException, JsonProcessingException {
+    jgen.writeStartObject();
+    for (LSServerMapField mapField : value.getMappers()) {
+      jgen.writeObjectField(mapField.getName(), mapField);
+    }
+    jgen.writeEndObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/rest/ShipperConfigResource.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/rest/ShipperConfigResource.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/rest/ShipperConfigResource.java
index 342d1cf..a7d99c9 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/rest/ShipperConfigResource.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/rest/ShipperConfigResource.java
@@ -33,6 +33,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 
 import org.apache.ambari.logsearch.manager.ShipperConfigManager;
+import org.apache.ambari.logsearch.model.common.LSServerInputConfig;
 import org.apache.ambari.logsearch.model.common.LSServerLogLevelFilterMap;
 import org.springframework.context.annotation.Scope;
 
@@ -65,7 +66,8 @@ public class ShipperConfigResource {
   @Path("/input/{clusterName}/services/{serviceName}")
   @Produces({"application/json"})
   @ApiOperation(GET_SHIPPER_CONFIG_OD)
-  public String getShipperConfig(@PathParam("clusterName") String clusterName, @PathParam("serviceName") String serviceName) {
+  public LSServerInputConfig getShipperConfig(@PathParam("clusterName") String clusterName, @PathParam("serviceName")
+    String serviceName) {
     return shipperConfigManager.getInputConfig(clusterName, serviceName);
   }
 
@@ -99,7 +101,7 @@ public class ShipperConfigResource {
   @Path("/filters/{clusterName}/level")
   @Produces({"application/json"})
   @ApiOperation(UPDATE_LOG_LEVEL_FILTER_OD)
-  public Response setogLevelFilter(LSServerLogLevelFilterMap request, @PathParam("clusterName") String clusterName) {
+  public Response setLogLevelFilter(LSServerLogLevelFilterMap request, @PathParam("clusterName") String clusterName) {
     return shipperConfigManager.setLogLevelFilters(clusterName, request);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index d171803..fb7ddf2 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -29,3 +29,4 @@ logfeeder.cache.key.field=log_message
 logfeeder.cache.dedup.interval=1000
 logfeeder.cache.last.dedup.enabled=true
 logsearch.config.zk_connect_string=localhost:9983
+logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
index 684d1dc..5bde17c 100644
--- a/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
+++ b/ambari-logsearch/docker/test-config/logsearch/logsearch.properties
@@ -43,10 +43,6 @@ logsearch.collection.history.replication.factor=1
 logsearch.solr.metrics.collector.hosts=
 logsearch.solr.jmx.port=18886
 
-# Logfeeder Settings
-
-logsearch.logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN
-
 # logsearch-admin.json
 logsearch.auth.file.enable=true
 logsearch.login.credentials.file=user_pass.json


[2/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely)

Posted by mg...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index ba872f8..4d6c43b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -70,7 +70,7 @@ public class OutputManager {
     Input input = inputMarker.input;
 
     // Update the block with the context fields
-    for (Map.Entry<String, String> entry : input.getContextFields().entrySet()) {
+    for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
       if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
         jsonObj.put(entry.getKey(), entry.getValue());
       }
@@ -79,13 +79,13 @@ public class OutputManager {
     // TODO: Ideally most of the overrides should be configurable
 
     if (jsonObj.get("type") == null) {
-      jsonObj.put("type", input.getStringValue("type"));
+      jsonObj.put("type", input.getInputDescriptor().getType());
     }
     if (jsonObj.get("path") == null && input.getFilePath() != null) {
       jsonObj.put("path", input.getFilePath());
     }
-    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
-      jsonObj.put("path", input.getStringValue("path"));
+    if (jsonObj.get("path") == null && input.getInputDescriptor().getPath() != null) {
+      jsonObj.put("path", input.getInputDescriptor().getPath());
     }
     if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
       jsonObj.put("host", LogFeederUtil.hostName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index d0f51b2..076d12d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -19,9 +19,6 @@
 package org.apache.ambari.logfeeder.output;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
@@ -31,11 +28,18 @@ import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
 import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.S3Util;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputS3FileDescriptorImpl;
 import org.apache.log4j.Logger;
 
 import java.io.File;
-import java.util.*;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 
 
 /**
@@ -50,7 +54,6 @@ import java.util.Map.Entry;
 public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
   private static final Logger LOG = Logger.getLogger(OutputS3File.class);
 
-  public static final String INPUT_ATTRIBUTE_TYPE = "type";
   public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
 
   private LogSpooler logSpooler;
@@ -72,9 +75,9 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
    */
   @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
-    String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
+    String type = inputMarker.input.getInputDescriptor().getType();
     S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
-    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getInputDescriptor().getType());
 
     uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
   }
@@ -82,43 +85,43 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
       String resolvedPath) {
 
-    ArrayList<Map<String, Object>> filters = new ArrayList<>();
+    ArrayList<FilterDescriptor> filters = new ArrayList<>();
     addFilters(filters, inputMarker.input.getFirstFilter());
-    Map<String, Object> inputConfig = new HashMap<>();
-    inputConfig.putAll(inputMarker.input.getConfigs());
+    InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.input.getInputDescriptor();
+    InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
+        InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
     String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
         LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
-    inputConfig.put("path", s3CompletePath);
+    inputS3FileDescriptor.setPath(s3CompletePath);
 
-    ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
-    inputConfigList.add(inputConfig);
+    ArrayList<InputDescriptorImpl> inputConfigList = new ArrayList<>();
+    inputConfigList.add(inputS3FileDescriptor);
     // set source s3_file
-    // remove global config from filter config
-    removeGlobalConfig(inputConfigList);
-    removeGlobalConfig(filters);
+    // remove global config from input config
+    removeS3GlobalConfig(inputS3FileDescriptor);
     // write config into s3 file
-    Map<String, Object> config = new HashMap<>();
-    config.put("filter", filters);
-    config.put("input", inputConfigList);
-    writeConfigToS3(config, getComponentConfigFileName(type), s3OutputConfiguration);
+    InputConfigImpl inputConfig = new InputConfigImpl();
+    inputConfig.setInput(inputConfigList);
+    
+    writeConfigToS3(inputConfig, getComponentConfigFileName(type), s3OutputConfiguration);
     // write global config
     writeGlobalConfig(s3OutputConfiguration);
   }
 
-  private void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
+  private void addFilters(ArrayList<FilterDescriptor> filters, Filter filter) {
     if (filter != null) {
-      Map<String, Object> filterConfig = new HashMap<String, Object>();
-      filterConfig.putAll(filter.getConfigs());
-      filters.add(filterConfig);
+      FilterDescriptor filterDescriptorOriginal = filter.getFilterDescriptor();
+      FilterDescriptor filterDescriptor = InputConfigGson.gson.fromJson(
+          InputConfigGson.gson.toJson(filterDescriptorOriginal), filterDescriptorOriginal.getClass());
+      filters.add(filterDescriptor);
       if (filter.getNextFilter() != null) {
         addFilters(filters, filter.getNextFilter());
       }
     }
   }
 
-  private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
-    Gson gson = new GsonBuilder().setPrettyPrinting().create();
-    String configJson = gson.toJson(configToWrite);
+  private void writeConfigToS3(Object config, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
+    String configJson = InputConfigGson.gson.toJson(config);
 
     String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
         s3OutputConfiguration.getCluster());
@@ -131,31 +134,14 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
     return "input.config-" + componentName + ".json";
   }
 
-
-  private Map<String, Object> getGlobalConfig() {
-    Map<String, Object> globalConfig = ConfigHandler.globalConfigs;
-    if (globalConfig == null) {
-      globalConfig = new HashMap<>();
-    }
-    return globalConfig;
-  }
-
-  private void removeGlobalConfig(List<Map<String, Object>> configList) {
-    Map<String, Object> globalConfig = getGlobalConfig();
-    if (configList != null && globalConfig != null) {
-      for (Entry<String, Object> globalConfigEntry : globalConfig.entrySet()) {
-        if (globalConfigEntry != null) {
-          String globalKey = globalConfigEntry.getKey();
-          if (globalKey != null && !globalKey.trim().isEmpty()) {
-            for (Map<String, Object> config : configList) {
-              if (config != null) {
-                config.remove(globalKey);
-              }
-            }
-          }
-        }
-      }
-    }
+  private void removeS3GlobalConfig(InputS3FileDescriptorImpl inputS3FileDescriptor) {
+    inputS3FileDescriptor.setSource(null);
+    inputS3FileDescriptor.setCopyFile(null);
+    inputS3FileDescriptor.setProcessFile(null);
+    inputS3FileDescriptor.setTail(null);
+    inputS3FileDescriptor.getAddFields().remove("ip");
+    inputS3FileDescriptor.getAddFields().remove("host");
+    inputS3FileDescriptor.getAddFields().remove("bundle_id");
   }
 
   /**
@@ -164,7 +150,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   @SuppressWarnings("unchecked")
   private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
     if (!uploadedGlobalConfig) {
-      Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
+      Map<String, Object> globalConfig = new HashMap<>();
       //updating global config before write to s3
       globalConfig.put("source", "s3_file");
       globalConfig.put("copy_file", false);
@@ -205,7 +191,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
   public void write(String block, InputMarker inputMarker) throws Exception {
     if (logSpooler == null) {
       logSpooler = createSpooler(inputMarker.input.getFilePath());
-      s3Uploader = createUploader(inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+      s3Uploader = createUploader(inputMarker.input.getInputDescriptor().getType());
     }
     logSpooler.add(block);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 1929178..d8a1fbb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -27,13 +27,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.ambari.logfeeder.LogFeeder;
 import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -205,55 +203,6 @@ public class LogFeederUtil {
     return retValue;
   }
 
-  @SuppressWarnings("unchecked")
-  public static boolean isEnabled(Map<String, Object> conditionConfigs, Map<String, Object> valueConfigs) {
-    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs.get("conditions");
-    if (MapUtils.isEmpty(conditions)) {
-      return toBoolean((String) valueConfigs.get("is_enabled"), true);
-    }
-    
-    for (String conditionType : conditions.keySet()) {
-      if (!conditionType.equalsIgnoreCase("fields")) {
-        continue;
-      }
-      
-      Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
-      for (Map.Entry<String, Object> field : fields.entrySet()) {
-        if (field.getValue() instanceof String) {
-          if (isFieldConditionMatch(valueConfigs, field.getKey(), (String) field.getValue())) {
-            return true;
-          }
-        } else {
-          for (String stringValue : (List<String>) field.getValue()) {
-            if (isFieldConditionMatch(valueConfigs, field.getKey(), stringValue)) {
-              return true;
-            }
-          }
-        }
-      }
-    }
-    
-    return false;
-  }
-
-  private static boolean isFieldConditionMatch(Map<String, Object> configs, String fieldName, String stringValue) {
-    boolean allow = false;
-    String fieldValue = (String) configs.get(fieldName);
-    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
-      allow = true;
-    } else {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
-      if (addFields != null && addFields.get(fieldName) != null) {
-        String addFieldValue = (String) addFields.get(fieldName);
-        if (stringValue.equalsIgnoreCase(addFieldValue)) {
-          allow = true;
-        }
-      }
-    }
-    return allow;
-  }
-
   public static void logStatForMetric(MetricData metric, String prefixStr, String postFix) {
     long currStat = metric.value;
     long currMS = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index 99565c5..8d7e86c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.ambari.logfeeder.filter;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterGrokDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -43,12 +44,12 @@ public class FilterGrokTest {
   private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
-  public void init(Map<String, Object> config) throws Exception {
+  public void init(FilterGrokDescriptor filterGrokDescriptor) throws Exception {
     mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterGrok = new FilterGrok();
-    filterGrok.loadConfig(config);
+    filterGrok.loadConfig(filterGrokDescriptor);
     filterGrok.setOutputManager(mockOutputManager);
     filterGrok.setInput(EasyMock.mock(Input.class));
     filterGrok.init();
@@ -58,10 +59,10 @@ public class FilterGrokTest {
   public void testFilterGrok_parseMessage() throws Exception {
     LOG.info("testFilterGrok_parseMessage()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
-    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
-    init(config);
+    FilterGrokDescriptorImpl filterGrokDescriptor = new FilterGrokDescriptorImpl();
+    filterGrokDescriptor.setMessagePattern("(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
+    init(filterGrokDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -84,10 +85,10 @@ public class FilterGrokTest {
   public void testFilterGrok_parseMultiLineMessage() throws Exception {
     LOG.info("testFilterGrok_parseMultiLineMessage()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
-    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
-    init(config);
+    FilterGrokDescriptorImpl filterGrokDescriptor = new FilterGrokDescriptorImpl();
+    filterGrokDescriptor.setMessagePattern("(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
+    init(filterGrokDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -114,10 +115,10 @@ public class FilterGrokTest {
   public void testFilterGrok_notMatchingMesagePattern() throws Exception {
     LOG.info("testFilterGrok_notMatchingMesagePattern()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
-    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
-    init(config);
+    FilterGrokDescriptorImpl filterGrokDescriptor = new FilterGrokDescriptorImpl();
+    filterGrokDescriptor.setMessagePattern("(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+    filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
+    init(filterGrokDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall().anyTimes();
@@ -134,9 +135,9 @@ public class FilterGrokTest {
   public void testFilterGrok_noMesagePattern() throws Exception {
     LOG.info("testFilterGrok_noMesagePattern()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
-    init(config);
+    FilterGrokDescriptorImpl filterGrokDescriptor = new FilterGrokDescriptorImpl();
+    filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
+    init(filterGrokDescriptor);
 
     EasyMock.replay(mockOutputManager);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index 643dafc..8f75c3a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.filter;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
@@ -29,6 +28,7 @@ import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
@@ -47,12 +47,12 @@ public class FilterJSONTest {
   private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
-  public void init(Map<String, Object> params) throws Exception {
+  public void init(FilterJsonDescriptorImpl filterJsonDescriptor) throws Exception {
     mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterJson = new FilterJSON();
-    filterJson.loadConfig(params);
+    filterJson.loadConfig(filterJsonDescriptor);
     filterJson.setOutputManager(mockOutputManager);
     filterJson.init();
   }
@@ -61,7 +61,7 @@ public class FilterJSONTest {
   public void testJSONFilterCode_convertFields() throws Exception {
     LOG.info("testJSONFilterCode_convertFields()");
 
-    init(new HashMap<String, Object>());
+    init(new FilterJsonDescriptorImpl());
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -86,7 +86,7 @@ public class FilterJSONTest {
   public void testJSONFilterCode_logTimeOnly() throws Exception {
     LOG.info("testJSONFilterCode_logTimeOnly()");
 
-    init(new HashMap<String, Object>());
+    init(new FilterJsonDescriptorImpl());
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -111,7 +111,7 @@ public class FilterJSONTest {
   public void testJSONFilterCode_lineNumberOnly() throws Exception {
     LOG.info("testJSONFilterCode_lineNumberOnly()");
 
-    init(new HashMap<String, Object>());
+    init(new FilterJsonDescriptorImpl());
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -131,7 +131,7 @@ public class FilterJSONTest {
   @Test
   public void testJSONFilterCode_invalidJson() throws Exception {
     LOG.info("testJSONFilterCode_invalidJson()");
-    init(new HashMap<String, Object>());
+    init(new FilterJsonDescriptorImpl());
     String inputStr="invalid json";
     try{
     filterJson.apply(inputStr,new InputMarker(null, null, 0));

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 05647e6..ae978fb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.ambari.logfeeder.filter;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterKeyValueDescriptorImpl;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
@@ -41,12 +42,12 @@ public class FilterKeyValueTest {
   private OutputManager mockOutputManager;
   private Capture<Map<String, Object>> capture;
 
-  public void init(Map<String, Object> config) throws Exception {
+  public void init(FilterKeyValueDescriptor filterKeyValueDescriptor) throws Exception {
     mockOutputManager = EasyMock.strictMock(OutputManager.class);
     capture = EasyMock.newCapture(CaptureType.LAST);
 
     filterKeyValue = new FilterKeyValue();
-    filterKeyValue.loadConfig(config);
+    filterKeyValue.loadConfig(filterKeyValueDescriptor);
     filterKeyValue.setOutputManager(mockOutputManager);
     filterKeyValue.init();
   }
@@ -55,11 +56,10 @@ public class FilterKeyValueTest {
   public void testFilterKeyValue_extraction() throws Exception {
     LOG.info("testFilterKeyValue_extraction()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("source_field", "keyValueField");
-    config.put("field_split", "&");
-    // using default value split:
-    init(config);
+    FilterKeyValueDescriptorImpl filterKeyValueDescriptor = new FilterKeyValueDescriptorImpl();
+    filterKeyValueDescriptor.setSourceField("keyValueField");
+    filterKeyValueDescriptor.setFieldSplit("&");
+    init(filterKeyValueDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -80,11 +80,11 @@ public class FilterKeyValueTest {
   public void testFilterKeyValue_extractionWithBorders() throws Exception {
     LOG.info("testFilterKeyValue_extractionWithBorders()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("source_field", "keyValueField");
-    config.put("field_split", "&");
-    config.put("value_borders", "()");
-    init(config);
+    FilterKeyValueDescriptorImpl filterKeyValueDescriptor = new FilterKeyValueDescriptorImpl();
+    filterKeyValueDescriptor.setSourceField("keyValueField");
+    filterKeyValueDescriptor.setFieldSplit("&");
+    filterKeyValueDescriptor.setValueBorders("()");
+    init(filterKeyValueDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall();
@@ -105,10 +105,9 @@ public class FilterKeyValueTest {
   public void testFilterKeyValue_missingSourceField() throws Exception {
     LOG.info("testFilterKeyValue_missingSourceField()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("field_split", "&");
-    // using default value split: =
-    init(config);
+    FilterKeyValueDescriptorImpl filterKeyValueDescriptor = new FilterKeyValueDescriptorImpl();
+    filterKeyValueDescriptor.setFieldSplit("&");
+    init(filterKeyValueDescriptor);
 
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
     EasyMock.expectLastCall().anyTimes();
@@ -124,10 +123,10 @@ public class FilterKeyValueTest {
   public void testFilterKeyValue_noSourceFieldPresent() throws Exception {
     LOG.info("testFilterKeyValue_noSourceFieldPresent()");
 
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("source_field", "keyValueField");
-    config.put("field_split", "&");
-    init(config);
+    FilterKeyValueDescriptorImpl filterKeyValueDescriptor = new FilterKeyValueDescriptorImpl();
+    filterKeyValueDescriptor.setSourceField("keyValueField");
+    filterKeyValueDescriptor.setFieldSplit("&");
+    init(filterKeyValueDescriptor);
 
     // using default value split: =
     mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 522f6bb..3a5f31e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -22,12 +22,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputFileDescriptorImpl;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.easymock.EasyMock;
@@ -78,15 +77,14 @@ public class InputFileTest {
   }
 
   public void init(String path) throws Exception {
-    Map<String, Object> config = new HashMap<String, Object>();
-    config.put("source", "file");
-    config.put("tail", "true");
-    config.put("gen_event_md5", "true");
-    config.put("start_position", "beginning");
-
-    config.put("type", "hdfs_datanode");
-    config.put("rowtype", "service");
-    config.put("path", path);
+    InputFileDescriptorImpl inputFileDescriptor = new InputFileDescriptorImpl();
+    inputFileDescriptor.setSource("file");
+    inputFileDescriptor.setTail(true);
+    inputFileDescriptor.setGenEventMd5(true);
+    inputFileDescriptor.setStartPosition("beginning");
+    inputFileDescriptor.setType("hdfs_datanode");
+    inputFileDescriptor.setRowtype("service");
+    inputFileDescriptor.setPath(path);
 
     Filter capture = new Filter() {
       @Override
@@ -104,7 +102,7 @@ public class InputFileTest {
     };
 
     inputFile = new InputFile();
-    inputFile.loadConfig(config);
+    inputFile.loadConfig(inputFileDescriptor);
     inputFile.addFilter(capture);
     inputFile.init();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
index 44314c6..4123dad 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandlerTest.java
@@ -21,13 +21,10 @@ package org.apache.ambari.logfeeder.logconfig;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
 
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
@@ -36,6 +33,7 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.commons.lang.time.DateUtils;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -44,16 +42,18 @@ public class LogConfigHandlerTest {
   private static InputMarker inputMarkerAudit;
   private static InputMarker inputMarkerService;
   static {
-    Map<String, Object> auditMap = new HashMap<String, Object>();
-    auditMap.put(LogFeederConstants.ROW_TYPE, "audit");
+    InputDescriptorImpl auditInputDescriptor = new InputDescriptorImpl() {};
+    auditInputDescriptor.setRowtype("audit");
+    
     Input auditInput = strictMock(Input.class);
-    expect(auditInput.getConfigs()).andReturn(auditMap).anyTimes();
+    expect(auditInput.getInputDescriptor()).andReturn(auditInputDescriptor).anyTimes();
     inputMarkerAudit = new InputMarker(auditInput, null, 0);
     
-    Map<String, Object> serviceMap = new HashMap<String, Object>();
-    serviceMap.put(LogFeederConstants.ROW_TYPE, "service");
+    InputDescriptorImpl serviceInputDescriptor = new InputDescriptorImpl() {};
+    serviceInputDescriptor.setRowtype("service");
+    
     Input serviceInput = strictMock(Input.class);
-    expect(serviceInput.getConfigs()).andReturn(serviceMap).anyTimes();
+    expect(serviceInput.getInputDescriptor()).andReturn(serviceInputDescriptor).anyTimes();
     inputMarkerService = new InputMarker(serviceInput, null, 0);
     
     replay(auditInput, serviceInput);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 8beecda..0a0a9fd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapDateDescriptorImpl;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.log4j.Logger;
 import org.junit.Test;
@@ -40,11 +41,11 @@ public class MapperDateTest {
   public void testMapperDate_epoch() {
     LOG.info("testMapperDate_epoch()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", "epoch");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
+    mapDateDescriptor.setTargetDatePattern("epoch");
 
     MapperDate mapperDate = new MapperDate();
-    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
 
@@ -61,11 +62,11 @@ public class MapperDateTest {
   public void testMapperDate_pattern() throws Exception {
     LOG.info("testMapperDate_pattern()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
+    mapDateDescriptor.setTargetDatePattern("yyyy-MM-dd HH:mm:ss.SSS");
 
     MapperDate mapperDate = new MapperDate();
-    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
     String dateString = "2016-04-08 15:55:23.548";
@@ -80,44 +81,35 @@ public class MapperDateTest {
   }
 
   @Test
-  public void testMapperDate_configNotMap() {
-    LOG.info("testMapperDate_configNotMap()");
-
-    MapperDate mapperDate = new MapperDate();
-    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, ""));
-  }
-
-  @Test
   public void testMapperDate_noDatePattern() {
     LOG.info("testMapperDate_noDatePattern()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("some_param", "some_value");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
 
     MapperDate mapperDate = new MapperDate();
-    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertFalse("Was not able to initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
   }
 
   @Test
   public void testMapperDate_notParsableDatePattern() {
     LOG.info("testMapperDate_notParsableDatePattern()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", "not_parsable_content");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
+    mapDateDescriptor.setTargetDatePattern("not_parsable_content");
 
     MapperDate mapperDate = new MapperDate();
-    assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertFalse("Was not able to initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
   }
 
   @Test
   public void testMapperDate_invalidEpochValue() {
     LOG.info("testMapperDate_invalidEpochValue()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", "epoch");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
+    mapDateDescriptor.setTargetDatePattern("epoch");
 
     MapperDate mapperDate = new MapperDate();
-    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
     String invalidValue = "abc";
@@ -131,11 +123,11 @@ public class MapperDateTest {
   public void testMapperDate_invalidDateStringValue() {
     LOG.info("testMapperDate_invalidDateStringValue()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("target_date_pattern", "yyyy-MM-dd HH:mm:ss.SSS");
+    MapDateDescriptorImpl mapDateDescriptor = new MapDateDescriptorImpl();
+    mapDateDescriptor.setTargetDatePattern("yyyy-MM-dd HH:mm:ss.SSS");
 
     MapperDate mapperDate = new MapperDate();
-    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapDateDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
     String invalidValue = "abc";

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java
index 108c96e..4899dfc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopyTest.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapFieldCopyDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
@@ -35,11 +36,11 @@ public class MapperFieldCopyTest {
   public void testMapperFieldCopy_copyField() {
     LOG.info("testMapperFieldCopy_copyField()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("copy_name", "someOtherField");
+    MapFieldCopyDescriptorImpl mapFieldCopyDescriptor = new MapFieldCopyDescriptorImpl();
+    mapFieldCopyDescriptor.setCopyName("someOtherField");
 
     MapperFieldCopy mapperFieldCopy = new MapperFieldCopy();
-    assertTrue("Could not initialize!", mapperFieldCopy.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperFieldCopy.init(null, "someField", null, mapFieldCopyDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
     jsonObj.put("someField", "someValue");
@@ -52,20 +53,12 @@ public class MapperFieldCopyTest {
   }
 
   @Test
-  public void testMapperFielCopy_configNotMap() {
-    LOG.info("testMapperFieldCopy_configNotMap()");
-
-    MapperFieldCopy mapperFieldCopy = new MapperFieldCopy();
-    assertFalse("Was able to initialize!", mapperFieldCopy.init(null, "someField", null, ""));
-  }
-
-  @Test
   public void testMapperFieldCopy_noNewFieldName() {
     LOG.info("testMapperFieldCopy_noNewFieldName()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
+    MapFieldCopyDescriptorImpl mapFieldCopyDescriptor = new MapFieldCopyDescriptorImpl();
 
     MapperFieldCopy mapperFieldCopy = new MapperFieldCopy();
-    assertFalse("Was able to initialize!", mapperFieldCopy.init(null, "someField", null, mapConfigs));
+    assertFalse("Was not able to initialize!", mapperFieldCopy.init(null, "someField", null, mapFieldCopyDescriptor));
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
index 8ecaad1..74b88fc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapFieldNameDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
@@ -35,11 +36,11 @@ public class MapperFieldNameTest {
   public void testMapperFieldName_replaceField() {
     LOG.info("testMapperFieldName_replaceField()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("new_fieldname", "someOtherField");
+    MapFieldNameDescriptorImpl mapFieldNameDescriptor = new MapFieldNameDescriptorImpl();
+    mapFieldNameDescriptor.setNewFieldName("someOtherField");
 
     MapperFieldName mapperFieldName = new MapperFieldName();
-    assertTrue("Could not initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperFieldName.init(null, "someField", null, mapFieldNameDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
     jsonObj.put("someField", "someValue");
@@ -52,20 +53,12 @@ public class MapperFieldNameTest {
   }
 
   @Test
-  public void testMapperFieldName_configNotMap() {
-    LOG.info("testMapperFieldName_configNotMap()");
-
-    MapperFieldName mapperFieldName = new MapperFieldName();
-    assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, ""));
-  }
-
-  @Test
   public void testMapperFieldName_noNewFieldName() {
     LOG.info("testMapperFieldName_noNewFieldName()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
+    MapFieldNameDescriptorImpl mapFieldNameDescriptor = new MapFieldNameDescriptorImpl();
 
     MapperFieldName mapperFieldName = new MapperFieldName();
-    assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+    assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, mapFieldNameDescriptor));
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
index fce4308..1a33740 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.MapFieldValueDescriptorImpl;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
@@ -35,12 +36,12 @@ public class MapperFieldValueTest {
   public void testMapperFieldValue_replaceValue() {
     LOG.info("testMapperFieldValue_replaceValue()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("pre_value", "someValue");
-    mapConfigs.put("post_value", "someOtherValue");
+    MapFieldValueDescriptorImpl mapFieldValueDescriptor = new MapFieldValueDescriptorImpl();
+    mapFieldValueDescriptor.setPreValue("someValue");
+    mapFieldValueDescriptor.setPostValue("someOtherValue");
 
     MapperFieldValue mapperFieldValue = new MapperFieldValue();
-    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
 
@@ -52,33 +53,25 @@ public class MapperFieldValueTest {
   }
 
   @Test
-  public void testMapperFieldValue_configNotMap() {
-    LOG.info("testMapperFieldValue_configNotMap()");
-
-    MapperFieldValue mapperFieldValue = new MapperFieldValue();
-    assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, ""));
-  }
-
-  @Test
   public void testMapperFieldValue_noPostValue() {
     LOG.info("testMapperFieldValue_noPostValue()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
+    MapFieldValueDescriptorImpl mapFieldValueDescriptor = new MapFieldValueDescriptorImpl();
 
     MapperFieldValue mapperFieldValue = new MapperFieldValue();
-    assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+    assertFalse("Was not able to initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor));
   }
 
   @Test
   public void testMapperFieldValue_noPreValueFound() {
     LOG.info("testMapperFieldValue_noPreValueFound()");
 
-    Map<String, Object> mapConfigs = new HashMap<>();
-    mapConfigs.put("pre_value", "someValue");
-    mapConfigs.put("post_value", "someOtherValue");
+    MapFieldValueDescriptorImpl mapFieldValueDescriptor = new MapFieldValueDescriptorImpl();
+    mapFieldValueDescriptor.setPreValue("someValue");
+    mapFieldValueDescriptor.setPostValue("someOtherValue");
 
     MapperFieldValue mapperFieldValue = new MapperFieldValue();
-    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+    assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapFieldValueDescriptor));
 
     Map<String, Object> jsonObj = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
index 1ccc319..6e108ab 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder.output;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +51,7 @@ public class OutputLineFilterTest {
   public void testApplyWithFilterOutByDedupInterval() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, false));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     // WHEN
@@ -63,7 +65,7 @@ public class OutputLineFilterTest {
   public void testApplyDoNotFilterOutDataByDedupInterval() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 10L, false));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     // WHEN
@@ -77,7 +79,7 @@ public class OutputLineFilterTest {
   public void testApplyWithFilterOutByDedupLast() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 10L, true));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     // WHEN
@@ -91,7 +93,7 @@ public class OutputLineFilterTest {
   public void testApplyDoNotFilterOutDataByDedupLast() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache("myMessage2", 10L, true));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     // WHEN
@@ -117,7 +119,7 @@ public class OutputLineFilterTest {
   public void testApplyWithoutInMemoryTimestamp() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, true));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     Map<String, Object> lineMap = generateLineMap();
@@ -133,7 +135,7 @@ public class OutputLineFilterTest {
   public void testApplyWithoutLogMessage() {
     // GIVEN
     EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, true));
-    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getInputDescriptor()).andReturn(generateInputDescriptor());
     EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
     EasyMock.replay(inputMock);
     Map<String, Object> lineMap = generateLineMap();
@@ -152,10 +154,10 @@ public class OutputLineFilterTest {
     return lineMap;
   }
 
-  private Map<String, Object> generateInputConfigs() {
-    Map<String, Object> inputConfigs = new HashMap<>();
-    inputConfigs.put(LogFeederConstants.ROW_TYPE, "service");
-    return inputConfigs;
+  private InputDescriptor generateInputDescriptor() {
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+    inputDescriptor.setRowtype("service");
+    return inputDescriptor;
   }
 
   private LRUCache createLruCache(String defaultKey, long defaultValue, boolean lastDedupEanabled) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
index cf1d25a..5abb720 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.junit.Test;
 
 public class OutputManagerTest {
@@ -91,15 +92,17 @@ public class OutputManagerTest {
     
     Input mockInput = strictMock(Input.class);
     InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+    inputDescriptor.setAddFields(Collections.<String, String> emptyMap());
     
     Output output1 = strictMock(Output.class);
     Output output2 = strictMock(Output.class);
     Output output3 = strictMock(Output.class);
     
-    expect(mockInput.getContextFields()).andReturn(Collections.<String, String> emptyMap());
+    expect(mockInput.getInputDescriptor()).andReturn(inputDescriptor);
     expect(mockInput.isUseEventMD5()).andReturn(false);
     expect(mockInput.isGenEventMD5()).andReturn(false);
-    expect(mockInput.getConfigs()).andReturn(Collections.<String, Object> emptyMap());
+    expect(mockInput.getInputDescriptor()).andReturn(inputDescriptor);
     expect(mockInput.getCache()).andReturn(null);
     expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
 
@@ -125,12 +128,13 @@ public class OutputManagerTest {
     
     Input mockInput = strictMock(Input.class);
     InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
     
     Output output1 = strictMock(Output.class);
     Output output2 = strictMock(Output.class);
     Output output3 = strictMock(Output.class);
     
-    expect(mockInput.getConfigs()).andReturn(Collections.<String, Object> emptyMap());
+    expect(mockInput.getInputDescriptor()).andReturn(inputDescriptor);
     expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
     
     output1.write(jsonString, inputMarker); expectLastCall();

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 1872135..7c6aca2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -22,6 +22,7 @@ import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,7 +34,6 @@ import static org.easymock.EasyMock.*;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
 public class OutputS3FileTest {
 
   private Map<String, Object> configMap;
@@ -71,8 +71,11 @@ public class OutputS3FileTest {
 
     Input input = mock(Input.class);
     InputMarker inputMarker = new InputMarker(input, null, 0);
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+    inputDescriptor.setType("hdfs-namenode");
+    
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block");
     final S3Uploader s3Uploader = mock(S3Uploader.class);
@@ -99,8 +102,11 @@ public class OutputS3FileTest {
   public void shouldReuseSpoolerForSamePath() throws Exception {
     Input input = mock(Input.class);
     InputMarker inputMarker = new InputMarker(input, null, 0);
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+    inputDescriptor.setType("hdfs-namenode");
+    
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block1");
     spooler.add("log event block2");
@@ -169,8 +175,11 @@ public class OutputS3FileTest {
   public void shouldUploadFileOnRollover() throws Exception {
     Input input = mock(Input.class);
     InputMarker inputMarker = new InputMarker(input, null, 0);
+    InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+    inputDescriptor.setType("hdfs-namenode");
+    
     expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
-    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    expect(input.getInputDescriptor()).andReturn(inputDescriptor);
     final LogSpooler spooler = mock(LogSpooler.class);
     spooler.add("log event block1");
     final S3Uploader s3Uploader = mock(S3Uploader.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
index 1118233..44d91a9 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
@@ -21,7 +21,9 @@ package org.apache.ambari.logsearch.manager;
 
 import java.util.List;
 
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer;
+import org.apache.ambari.logsearch.model.common.LSServerInputConfig;
 import org.apache.ambari.logsearch.model.common.LSServerLogLevelFilterMap;
 import org.apache.log4j.Logger;
 
@@ -50,8 +52,9 @@ public class ShipperConfigManager extends JsonManagerBase {
     return LogSearchConfigConfigurer.getConfig().getServices(clusterName);
   }
 
-  public String getInputConfig(String clusterName, String serviceName) {
-    return LogSearchConfigConfigurer.getConfig().getInputConfig(clusterName, serviceName);
+  public LSServerInputConfig getInputConfig(String clusterName, String serviceName) {
+    InputConfig inputConfig = LogSearchConfigConfigurer.getConfig().getInputConfig(clusterName, serviceName);
+    return new LSServerInputConfig(inputConfig);
   }
 
   public Response createInputConfig(String clusterName, String serviceName, String inputConfig) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerConditions.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerConditions.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerConditions.java
new file mode 100644
index 0000000..9cd9710
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerConditions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerConditions {
+  private LSServerFields fields;
+  
+  public LSServerConditions(Conditions conditions) {
+    this.fields = new LSServerFields(conditions.getFields());
+  }
+
+  public LSServerFields getFields() {
+    return fields;
+  }
+
+  public void setFields(LSServerFields fields) {
+    this.fields = fields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFields.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFields.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFields.java
new file mode 100644
index 0000000..5f570da
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFields.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.util.Set;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerFields {
+  private Set<String> type;
+  
+  public LSServerFields(Fields fields) {
+    this.type = fields.getType();
+  }
+
+  public Set<String> getType() {
+    return type;
+  }
+
+  public void setType(Set<String> type) {
+    this.type = type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilter.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilter.java
new file mode 100644
index 0000000..0190c01
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+@JsonInclude(Include.NON_NULL)
+public abstract class LSServerFilter {
+  private String filter;
+  
+  private LSServerConditions conditions;
+  
+  @JsonProperty("sort_order")
+  private Integer sortOrder;
+  
+  private String sourceField;
+  
+  @JsonProperty("remove_source_field")
+  private Boolean removeSourceField;
+  
+  private Map<String, List<LSServerPostMapValues>> postMapValues;
+  
+  @JsonProperty("is_enabled")
+  private Boolean isEnabled;
+
+  public LSServerFilter(FilterDescriptor filterDescriptor) {
+    this.filter = filterDescriptor.getFilter();
+    this.conditions = new LSServerConditions(filterDescriptor.getConditions());
+    this.sortOrder = filterDescriptor.getSortOrder();
+    this.sourceField = filterDescriptor.getSourceField();
+    this.removeSourceField = filterDescriptor.isRemoveSourceField();
+    
+    postMapValues = new HashMap<String, List<LSServerPostMapValues>>();
+    for (Map.Entry<String, ? extends List<? extends PostMapValues>> e : filterDescriptor.getPostMapValues().entrySet()) {
+      List<LSServerPostMapValues> lsServerPostMapValues = new ArrayList<>();
+      for (PostMapValues pmv : e.getValue()) {
+        lsServerPostMapValues.add(new LSServerPostMapValues(pmv));
+      }
+      postMapValues.put(e.getKey(), lsServerPostMapValues);
+    }
+    
+    this.isEnabled = filterDescriptor.isEnabled();
+  }
+
+  public String getFilter() {
+    return filter;
+  }
+
+  public void setFilter(String filter) {
+    this.filter = filter;
+  }
+
+  public LSServerConditions getConditions() {
+    return conditions;
+  }
+
+  public void setConditions(LSServerConditions conditions) {
+    this.conditions = conditions;
+  }
+
+  public Integer getSortOrder() {
+    return sortOrder;
+  }
+
+  public void setSortOrder(Integer sortOrder) {
+    this.sortOrder = sortOrder;
+  }
+
+  public String getSourceField() {
+    return sourceField;
+  }
+
+  public void setSourceField(String sourceField) {
+    this.sourceField = sourceField;
+  }
+
+  public Boolean getRemoveSourceField() {
+    return removeSourceField;
+  }
+
+  public void setRemoveSourceField(Boolean removeSourceField) {
+    this.removeSourceField = removeSourceField;
+  }
+
+  public Map<String, List<LSServerPostMapValues>> getPostMapValues() {
+    return postMapValues;
+  }
+
+  public void setPostMapValues(Map<String, List<LSServerPostMapValues>> postMapValues) {
+    this.postMapValues = postMapValues;
+  }
+
+  public Boolean getIsEnabled() {
+    return isEnabled;
+  }
+
+  public void setIsEnabled(Boolean isEnabled) {
+    this.isEnabled = isEnabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
new file mode 100644
index 0000000..a8c4a7a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterGrok.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerFilterGrok extends LSServerFilter {
+  @JsonProperty("log4j_format")
+  private String log4jFormat;
+
+  @JsonProperty("multiline_pattern")
+  private String multilinePattern;
+
+  @JsonProperty("message_pattern")
+  private String messagePattern;
+
+  public LSServerFilterGrok(FilterDescriptor filterDescriptor) {
+    super(filterDescriptor);
+    if (filterDescriptor instanceof FilterGrokDescriptor) {
+      FilterGrokDescriptor filterGrokDescriptor = (FilterGrokDescriptor)filterDescriptor;
+      this.log4jFormat = filterGrokDescriptor.getLog4jFormat();
+      this.multilinePattern = filterGrokDescriptor.getMultilinePattern();
+      this.messagePattern = filterGrokDescriptor.getMessagePattern();
+    }
+  }
+
+  public String getLog4jFormat() {
+    return log4jFormat;
+  }
+
+  public void setLog4jFormat(String log4jFormat) {
+    this.log4jFormat = log4jFormat;
+  }
+
+  public String getMultilinePattern() {
+    return multilinePattern;
+  }
+
+  public void setMultilinePattern(String multilinePattern) {
+    this.multilinePattern = multilinePattern;
+  }
+
+  public String getMessagePattern() {
+    return messagePattern;
+  }
+
+  public void setMessagePattern(String messagePattern) {
+    this.messagePattern = messagePattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterJson.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterJson.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterJson.java
new file mode 100644
index 0000000..3c0ed17
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterJson.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerFilterJson extends LSServerFilter {
+  public LSServerFilterJson(FilterDescriptor filterDescriptor) {
+    super(filterDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterKeyValue.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterKeyValue.java
new file mode 100644
index 0000000..dcee25d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerFilterKeyValue.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+public class LSServerFilterKeyValue extends LSServerFilter {
+  @JsonProperty("field_split")
+  private String fieldSplit;
+
+  @JsonProperty("value_split")
+  private String valueSplit;
+
+  @JsonProperty("value_borders")
+  private String valueBorders;
+
+  public LSServerFilterKeyValue(FilterDescriptor filterDescriptor) {
+    super(filterDescriptor);
+    FilterKeyValueDescriptor filterKeyValueDescriptor = (FilterKeyValueDescriptor)filterDescriptor;
+    this.fieldSplit = filterKeyValueDescriptor.getFieldSplit();
+    this.valueSplit = filterKeyValueDescriptor.getValueSplit();
+    this.valueBorders = filterKeyValueDescriptor.getValueBorders();
+  }
+
+  public String getFieldSplit() {
+    return fieldSplit;
+  }
+
+  public void setFieldSplit(String fieldSplit) {
+    this.fieldSplit = fieldSplit;
+  }
+
+  public String getValueSplit() {
+    return valueSplit;
+  }
+
+  public void setValueSplit(String valueSplit) {
+    this.valueSplit = valueSplit;
+  }
+
+  public String getValueBorders() {
+    return valueBorders;
+  }
+
+  public void setValueBorders(String valueBorders) {
+    this.valueBorders = valueBorders;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
new file mode 100644
index 0000000..fe83fe4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInput.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.annotations.ApiModel;
+
+@ApiModel
+@JsonInclude(Include.NON_NULL)
+public abstract class LSServerInput {
+  private final String type;
+  private final String rowtype;
+  private final String path;
+  
+  @JsonProperty("add_fields")
+  private final Map<String, String> addFields;
+  
+  private final String source;
+  private final Boolean tail;
+  
+  @JsonProperty("gen_event_md5")
+  private final Boolean genEventMd5;
+  
+  @JsonProperty("use_event_md5_as_id")
+  private final Boolean useEventMd5AsId;
+  
+  @JsonProperty("start_position")
+  private final String startPosition;
+  
+  @JsonProperty("cache_enabled")
+  private final Boolean cacheEnabled;
+  
+  @JsonProperty("cache_key_field")
+  private final String cacheKeyField;
+  
+  @JsonProperty("cache_last_dedup_enabled")
+  private final Boolean cacheLastDedupEnabled;
+  
+  @JsonProperty("cache_size")
+  private final Integer cacheSize;
+  
+  @JsonProperty("cache_dedup_interval")
+  private final Long cacheDedupInterval;
+  
+  @JsonProperty("is_enabled")
+  private final Boolean isEnabled;
+  
+  public LSServerInput(InputDescriptor inputDescriptor) {
+    this.type = inputDescriptor.getType();
+    this.rowtype = inputDescriptor.getRowtype();
+    this.path = inputDescriptor.getPath();
+    this.addFields = inputDescriptor.getAddFields();
+    this.source = inputDescriptor.getSource();
+    this.tail = inputDescriptor.isTail();
+    this.genEventMd5 = inputDescriptor.isGenEventMd5();
+    this.useEventMd5AsId = inputDescriptor.isUseEventMd5AsId();
+    this.startPosition = inputDescriptor.getStartPosition();
+    this.cacheEnabled = inputDescriptor.isCacheEnabled();
+    this.cacheKeyField = inputDescriptor.getCacheKeyField();
+    this.cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled();
+    this.cacheSize = inputDescriptor.getCacheSize();
+    this.cacheDedupInterval = inputDescriptor.getCacheDedupInterval();
+    this.isEnabled = inputDescriptor.isEnabled();
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getRowtype() {
+    return rowtype;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public Map<String, String> getAddFields() {
+    return addFields;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public Boolean getTail() {
+    return tail;
+  }
+
+  public Boolean getGenEventMd5() {
+    return genEventMd5;
+  }
+
+  public Boolean getUseEventMd5AsId() {
+    return useEventMd5AsId;
+  }
+
+  public String getStartPosition() {
+    return startPosition;
+  }
+
+  public Boolean getCacheEnabled() {
+    return cacheEnabled;
+  }
+
+  public String getCacheKeyField() {
+    return cacheKeyField;
+  }
+
+  public Boolean getCacheLastDedupEnabled() {
+    return cacheLastDedupEnabled;
+  }
+
+  public Integer getCacheSize() {
+    return cacheSize;
+  }
+
+  public Long getCacheDedupInterval() {
+    return cacheDedupInterval;
+  }
+
+  public Boolean getIsEnabled() {
+    return isEnabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
new file mode 100644
index 0000000..e3dc0d1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.model.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterJsonDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class LSServerInputConfig {
+  @ApiModelProperty
+  private List<LSServerInput> input;
+  
+  @ApiModelProperty
+  private List<LSServerFilter> filter;
+  
+  public LSServerInputConfig(InputConfig inputConfig) {
+    input = new ArrayList<>();
+    for (InputDescriptor inputDescriptor : inputConfig.getInput()) {
+      if (inputDescriptor instanceof InputFileBaseDescriptor) {
+        LSServerInput inputItem = new LSServerInputFile(inputDescriptor);
+        input.add(inputItem);
+      } else if (inputDescriptor instanceof InputS3FileDescriptor) {
+        LSServerInput inputItem = new LSServerInputS3File(inputDescriptor);
+        input.add(inputItem);
+      }
+    }
+    
+    filter = new ArrayList<>();
+    for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
+      if (filterDescriptor instanceof FilterGrokDescriptor) {
+        LSServerFilter filterItem = new LSServerFilterGrok(filterDescriptor);
+        filter.add(filterItem);
+      } else if (filterDescriptor instanceof FilterKeyValueDescriptor) {
+        LSServerFilter filterItem = new LSServerFilterKeyValue(filterDescriptor);
+        filter.add(filterItem);
+      } else if (filterDescriptor instanceof FilterJsonDescriptor) {
+        LSServerFilter filterItem = new LSServerFilterJson(filterDescriptor);
+        filter.add(filterItem);
+      }
+    }
+  }
+
+  public List<LSServerInput> getInput() {
+    return input;
+  }
+
+  public void setInput(List<LSServerInput> input) {
+    this.input = input;
+  }
+
+  public List<LSServerFilter> getFilter() {
+    return filter;
+  }
+
+  public void setFilter(List<LSServerFilter> filter) {
+    this.filter = filter;
+  }
+}


[4/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely)

Posted by mg...@apache.org.
AMBARI-21033 Log Search use POJOs for input configuration (mgergely)

Change-Id: Ibf28c16309cf3ced0f0eea69d832ecd8accd2d62


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fd4a7a46
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fd4a7a46
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fd4a7a46

Branch: refs/heads/trunk
Commit: fd4a7a46a2db9869dca28294660ca40e693504ea
Parents: cbb1e90
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Mon May 22 12:49:50 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Mon May 22 12:49:50 2017 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-config-api/pom.xml         |  14 +-
 .../config/api/InputConfigMonitor.java          |  13 +-
 .../logsearch/config/api/LogSearchConfig.java   |   3 +-
 .../config/api/LogSearchConfigFactory.java      |  10 +-
 .../api/model/inputconfig/Conditions.java       |  24 +++
 .../config/api/model/inputconfig/Fields.java    |  26 +++
 .../api/model/inputconfig/FilterDescriptor.java |  39 ++++
 .../model/inputconfig/FilterGrokDescriptor.java |  28 +++
 .../model/inputconfig/FilterJsonDescriptor.java |  23 +++
 .../inputconfig/FilterKeyValueDescriptor.java   |  28 +++
 .../api/model/inputconfig/InputConfig.java      |  28 +++
 .../api/model/inputconfig/InputDescriptor.java  |  54 +++++
 .../inputconfig/InputFileBaseDescriptor.java    |  28 +++
 .../model/inputconfig/InputFileDescriptor.java  |  23 +++
 .../inputconfig/InputS3FileDescriptor.java      |  26 +++
 .../model/inputconfig/MapDateDescriptor.java    |  26 +++
 .../inputconfig/MapFieldCopyDescriptor.java     |  24 +++
 .../model/inputconfig/MapFieldDescriptor.java   |  24 +++
 .../inputconfig/MapFieldNameDescriptor.java     |  24 +++
 .../inputconfig/MapFieldValueDescriptor.java    |  26 +++
 .../api/model/inputconfig/PostMapValues.java    |  26 +++
 .../config/api/LogSearchConfigClass1.java       |   3 +-
 .../config/api/LogSearchConfigClass2.java       |   3 +-
 .../ambari-logsearch-config-zookeeper/pom.xml   |   6 +
 .../config/zookeeper/LogSearchConfigZK.java     |  72 ++++++-
 .../model/inputconfig/impl/ConditionsImpl.java  |  37 ++++
 .../model/inputconfig/impl/FieldsImpl.java      |  39 ++++
 .../model/inputconfig/impl/FilterAdapter.java   |  42 ++++
 .../inputconfig/impl/FilterDescriptorImpl.java  | 113 ++++++++++
 .../impl/FilterGrokDescriptorImpl.java          |  66 ++++++
 .../impl/FilterJsonDescriptorImpl.java          |  25 +++
 .../impl/FilterKeyValueDescriptorImpl.java      |  63 ++++++
 .../model/inputconfig/impl/InputAdapter.java    |  58 ++++++
 .../model/inputconfig/impl/InputConfigGson.java |  46 +++++
 .../model/inputconfig/impl/InputConfigImpl.java |  54 +++++
 .../inputconfig/impl/InputDescriptorImpl.java   | 204 +++++++++++++++++++
 .../impl/InputFileBaseDescriptorImpl.java       |  66 ++++++
 .../impl/InputFileDescriptorImpl.java           |  25 +++
 .../impl/InputS3FileDescriptorImpl.java         |  53 +++++
 .../inputconfig/impl/MapDateDescriptorImpl.java |  58 ++++++
 .../impl/MapFieldCopyDescriptorImpl.java        |  45 ++++
 .../impl/MapFieldNameDescriptorImpl.java        |  45 ++++
 .../impl/MapFieldValueDescriptorImpl.java       |  58 ++++++
 .../inputconfig/impl/PostMapValuesAdapter.java  |  99 +++++++++
 .../inputconfig/impl/PostMapValuesImpl.java     |  40 ++++
 .../org/apache/ambari/logfeeder/LogFeeder.java  |   2 +-
 .../ambari/logfeeder/common/ConfigBlock.java    | 107 +---------
 .../ambari/logfeeder/common/ConfigHandler.java  | 126 ++++++------
 .../ambari/logfeeder/common/ConfigItem.java     |  97 +++++++++
 .../apache/ambari/logfeeder/filter/Filter.java  |  53 +++--
 .../ambari/logfeeder/filter/FilterGrok.java     |  11 +-
 .../ambari/logfeeder/filter/FilterJSON.java     |   3 -
 .../ambari/logfeeder/filter/FilterKeyValue.java |  12 +-
 .../logfeeder/input/AbstractInputFile.java      |  16 +-
 .../apache/ambari/logfeeder/input/Input.java    | 112 ++++++----
 .../ambari/logfeeder/input/InputFile.java       |   6 +-
 .../ambari/logfeeder/input/InputS3File.java     |   5 +-
 .../ambari/logfeeder/input/InputSimulate.java   |  23 +--
 .../logfeeder/loglevelfilter/FilterLogData.java |   2 +-
 .../apache/ambari/logfeeder/mapper/Mapper.java  |   4 +-
 .../ambari/logfeeder/mapper/MapperDate.java     |  15 +-
 .../logfeeder/mapper/MapperFieldCopy.java       |  13 +-
 .../logfeeder/mapper/MapperFieldName.java       |  14 +-
 .../logfeeder/mapper/MapperFieldValue.java      |  14 +-
 .../apache/ambari/logfeeder/output/Output.java  |   3 -
 .../logfeeder/output/OutputLineFilter.java      |   2 +-
 .../ambari/logfeeder/output/OutputManager.java  |   8 +-
 .../ambari/logfeeder/output/OutputS3File.java   |  96 ++++-----
 .../ambari/logfeeder/util/LogFeederUtil.java    |  51 -----
 .../ambari/logfeeder/filter/FilterGrokTest.java |  37 ++--
 .../ambari/logfeeder/filter/FilterJSONTest.java |  14 +-
 .../logfeeder/filter/FilterKeyValueTest.java    |  41 ++--
 .../ambari/logfeeder/input/InputFileTest.java   |  22 +-
 .../logconfig/LogConfigHandlerTest.java         |  18 +-
 .../ambari/logfeeder/mapper/MapperDateTest.java |  44 ++--
 .../logfeeder/mapper/MapperFieldCopyTest.java   |  19 +-
 .../logfeeder/mapper/MapperFieldNameTest.java   |  19 +-
 .../logfeeder/mapper/MapperFieldValueTest.java  |  29 +--
 .../logfeeder/output/OutputLineFilterTest.java  |  22 +-
 .../logfeeder/output/OutputManagerTest.java     |  10 +-
 .../logfeeder/output/OutputS3FileTest.java      |  17 +-
 .../logsearch/manager/ShipperConfigManager.java |   7 +-
 .../model/common/LSServerConditions.java        |  41 ++++
 .../logsearch/model/common/LSServerFields.java  |  43 ++++
 .../logsearch/model/common/LSServerFilter.java  | 130 ++++++++++++
 .../model/common/LSServerFilterGrok.java        |  73 +++++++
 .../model/common/LSServerFilterJson.java        |  31 +++
 .../model/common/LSServerFilterKeyValue.java    |  71 +++++++
 .../logsearch/model/common/LSServerInput.java   | 149 ++++++++++++++
 .../model/common/LSServerInputConfig.java       |  87 ++++++++
 .../model/common/LSServerInputFile.java         |  31 +++
 .../model/common/LSServerInputFileBase.java     |  72 +++++++
 .../model/common/LSServerInputS3File.java       |  59 ++++++
 .../logsearch/model/common/LSServerMapDate.java |  61 ++++++
 .../model/common/LSServerMapField.java          |  30 +++
 .../model/common/LSServerMapFieldCopy.java      |  49 +++++
 .../model/common/LSServerMapFieldName.java      |  49 +++++
 .../model/common/LSServerMapFieldValue.java     |  61 ++++++
 .../model/common/LSServerPostMapValues.java     |  63 ++++++
 .../common/LSServerPostMapValuesSerializer.java |  39 ++++
 .../logsearch/rest/ShipperConfigResource.java   |   6 +-
 .../test-config/logfeeder/logfeeder.properties  |   1 +
 .../test-config/logsearch/logsearch.properties  |   4 -
 103 files changed, 3460 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
index 72fcc80..5355906 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
@@ -41,17 +41,9 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>3.4</version>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.7</version>
     </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
index 29a82a6..746c14c 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
@@ -19,19 +19,28 @@
 
 package org.apache.ambari.logsearch.config.api;
 
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
 /**
  * Monitors input configuration changes.
  */
 public interface InputConfigMonitor {
   /**
+   * @return A list of json strings for all the global config jsons.
+   */
+  List<String> getGlobalConfigJsons();
+  
+  /**
    * Notification of a new input configuration.
    * 
    * @param serviceName The name of the service for which the input configuration was created.
    * @param inputConfig The input configuration.
    * @throws Exception
    */
-  void loadInputConfigs(String serviceName, String inputConfig) throws Exception;
-
+  void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception;
+  
   /**
    * Notification of the removal of an input configuration.
    * 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 07921d0..4cbf21f 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 /**
  * Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes.
@@ -71,7 +72,7 @@ public interface LogSearchConfig extends Closeable {
    * @param serviceName The name of the service looked for.
    * @return The input configuration for the service if it exists, null otherwise.
    */
-  String getInputConfig(String clusterName, String serviceName);
+  InputConfig getInputConfig(String clusterName, String serviceName);
 
   /**
    * Uploads the input configuration for a service in a cluster.

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
index 6ef4b90..947e7e7 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -22,14 +22,14 @@ package org.apache.ambari.logsearch.config.api;
 import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Factory class for LogSearchConfig.
  */
 public class LogSearchConfigFactory {
-  private static final Logger LOG = Logger.getLogger(LogSearchConfigFactory.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigFactory.class);
 
   /**
    * Creates a Log Search Configuration instance that implements {@link org.apache.ambari.logsearch.config.api.LogSearchConfig}.
@@ -47,7 +47,7 @@ public class LogSearchConfigFactory {
     try {
       LogSearchConfig logSearchConfig = null;
       String configClassName = properties.get("logsearch.config.class");
-      if (!StringUtils.isBlank(configClassName)) {
+      if (configClassName != null && !"".equals(configClassName.trim())) {
         Class<?> clazz = Class.forName(configClassName);
         if (LogSearchConfig.class.isAssignableFrom(clazz)) {
           logSearchConfig = (LogSearchConfig) clazz.newInstance();
@@ -61,7 +61,7 @@ public class LogSearchConfigFactory {
       logSearchConfig.init(component, properties);
       return logSearchConfig;
     } catch (Exception e) {
-      LOG.fatal("Could not initialize logsearch config.", e);
+      LOG.error("Could not initialize logsearch config.", e);
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Conditions.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Conditions.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Conditions.java
new file mode 100644
index 0000000..4da400a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Conditions.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface Conditions {
+  Fields getFields();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Fields.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Fields.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Fields.java
new file mode 100644
index 0000000..5d34b1e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/Fields.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+import java.util.Set;
+
+public interface Fields {
+  Set<String> getType();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterDescriptor.java
new file mode 100644
index 0000000..632c6cb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterDescriptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+import java.util.List;
+import java.util.Map;
+
+public interface FilterDescriptor {
+  String getFilter();
+
+  Conditions getConditions();
+
+  Integer getSortOrder();
+
+  String getSourceField();
+
+  Boolean isRemoveSourceField();
+
+  Map<String, ? extends List<? extends PostMapValues>> getPostMapValues();
+
+  Boolean isEnabled();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
new file mode 100644
index 0000000..e85ce97
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterGrokDescriptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface FilterGrokDescriptor extends FilterDescriptor {
+  String getLog4jFormat();
+
+  String getMultilinePattern();
+
+  String getMessagePattern();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterJsonDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterJsonDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterJsonDescriptor.java
new file mode 100644
index 0000000..08f1893
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterJsonDescriptor.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface FilterJsonDescriptor extends FilterDescriptor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterKeyValueDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterKeyValueDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterKeyValueDescriptor.java
new file mode 100644
index 0000000..6edd140
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/FilterKeyValueDescriptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface FilterKeyValueDescriptor extends FilterDescriptor {
+  String getFieldSplit();
+
+  String getValueSplit();
+
+  String getValueBorders();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputConfig.java
new file mode 100644
index 0000000..8126ac9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+import java.util.List;
+
+public interface InputConfig {
+  List<? extends InputDescriptor> getInput();
+
+  List<? extends FilterDescriptor> getFilter();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
new file mode 100644
index 0000000..c41da93
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+import java.util.Map;
+
+public interface InputDescriptor {
+  String getType();
+
+  String getRowtype();
+
+  String getPath();
+
+  Map<String, String> getAddFields();
+
+  String getSource();
+
+  Boolean isTail();
+
+  Boolean isGenEventMd5();
+
+  Boolean isUseEventMd5AsId();
+
+  String getStartPosition();
+
+  Boolean isCacheEnabled();
+
+  String getCacheKeyField();
+
+  Boolean getCacheLastDedupEnabled();
+
+  Integer getCacheSize();
+
+  Long getCacheDedupInterval();
+
+  Boolean isEnabled();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileBaseDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileBaseDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileBaseDescriptor.java
new file mode 100644
index 0000000..a393dc7
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileBaseDescriptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface InputFileBaseDescriptor extends InputDescriptor {
+  Boolean getProcessFile();
+
+  Boolean getCopyFile();
+
+  Integer getCheckpointIntervalMs();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
new file mode 100644
index 0000000..0070ad9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface InputFileDescriptor extends InputFileBaseDescriptor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
new file mode 100644
index 0000000..b075629
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface InputS3FileDescriptor extends InputFileBaseDescriptor {
+  String getS3AccessKey();
+
+  String getS3SecretKey();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
new file mode 100644
index 0000000..f88435f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapDateDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapDateDescriptor extends MapFieldDescriptor {
+  String getSourceDatePattern();
+
+  public String getTargetDatePattern();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldCopyDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldCopyDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldCopyDescriptor.java
new file mode 100644
index 0000000..596c173
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldCopyDescriptor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapFieldCopyDescriptor extends MapFieldDescriptor {
+  String getCopyName();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
new file mode 100644
index 0000000..db086c5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldDescriptor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapFieldDescriptor {
+  public String getJsonName();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldNameDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldNameDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldNameDescriptor.java
new file mode 100644
index 0000000..da8cd0d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldNameDescriptor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapFieldNameDescriptor extends MapFieldDescriptor {
+  String getNewFieldName();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
new file mode 100644
index 0000000..cf37e62
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/MapFieldValueDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface MapFieldValueDescriptor extends MapFieldDescriptor {
+  String getPreValue();
+
+  public String getPostValue();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/PostMapValues.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/PostMapValues.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/PostMapValues.java
new file mode 100644
index 0000000..5be7287
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/PostMapValues.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+import java.util.List;
+
+public interface PostMapValues {
+  List<MapFieldDescriptor> getMappers();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index fc3fe5b..d7e3c0a 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 public class LogSearchConfigClass1 implements LogSearchConfig {
   @Override
@@ -52,7 +53,7 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
   }
 
   @Override
-  public String getInputConfig(String clusterName, String serviceName) {
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index 346edb3..198c133 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 public class LogSearchConfigClass2 implements LogSearchConfig {
   @Override
@@ -52,7 +53,7 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
   }
 
   @Override
-  public String getInputConfig(String clusterName, String serviceName) {
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
index 2c59a4a..7ecda60 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
@@ -71,8 +71,14 @@
       <version>2.12.0</version>
     </dependency>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
+      <version>2.6.2</version>
     </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 5e22374..4d10a5b 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -27,6 +27,10 @@ import java.util.TreeMap;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.commons.collections.MapUtils;
@@ -40,18 +44,23 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 
 public class LogSearchConfigZK implements LogSearchConfig {
-  private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
 
   private static final int SESSION_TIMEOUT = 15000;
   private static final int CONNECTION_TIMEOUT = 30000;
@@ -129,7 +138,16 @@ public class LogSearchConfigZK implements LogSearchConfig {
 
   @Override
   public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor,
-      final LogLevelFilterMonitor logLevelFilterMonitor ) throws Exception {
+      final LogLevelFilterMonitor logLevelFilterMonitor) throws Exception {
+    final JsonParser parser = new JsonParser();
+    final JsonArray globalConfigNode = new JsonArray();
+    for (String globalConfigJsonString : inputConfigMonitor.getGlobalConfigJsons()) {
+      JsonElement globalConfigJson = parser.parse(globalConfigJsonString);
+      globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
+    }
+    
+    createGlobalConfigNode(globalConfigNode);
+    
     TreeCacheListener listener = new TreeCacheListener() {
       public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
         String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
@@ -171,7 +189,16 @@ public class LogSearchConfigZK implements LogSearchConfig {
 
       private void addInputs(String serviceName, String inputConfig) {
         try {
-          inputConfigMonitor.loadInputConfigs(serviceName, inputConfig);
+          JsonElement inputConfigJson = parser.parse(inputConfig);
+          for (Map.Entry<String, JsonElement> typeEntry : inputConfigJson.getAsJsonObject().entrySet()) {
+            for (JsonElement e : typeEntry.getValue().getAsJsonArray()) {
+              for (JsonElement globalConfig : globalConfigNode) {
+                merge(globalConfig.getAsJsonObject(), e.getAsJsonObject());
+              }
+            }
+          }
+          
+          inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
         } catch (Exception e) {
           LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
         }
@@ -193,11 +220,39 @@ public class LogSearchConfigZK implements LogSearchConfig {
             break;
         }
       }
+
+      private void merge(JsonObject source, JsonObject target) {
+        for (Map.Entry<String, JsonElement> e : source.entrySet()) {
+          if (!target.has(e.getKey())) {
+            target.add(e.getKey(), e.getValue());
+          } else {
+            if (e.getValue().isJsonObject()) {
+              JsonObject valueJson = (JsonObject)e.getValue();
+              merge(valueJson, target.get(e.getKey()).getAsJsonObject());
+            }
+          }
+        }
+      }
     };
     cache.getListenable().addListener(listener);
     cache.start();
   }
 
+  private void createGlobalConfigNode(JsonArray globalConfigNode) {
+    String globalConfigNodePath = String.format("%s/%s/global", root, properties.get(CLUSTER_NAME_PROPERTY));
+    String data = InputConfigGson.gson.toJson(globalConfigNode);
+    
+    try {
+      if (cache.getCurrentData(globalConfigNodePath) != null) {
+        client.setData().forPath(globalConfigNodePath, data.getBytes());
+      } else {
+        client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes());
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during global config node creation/update", e);
+    }
+  }
+
   @Override
   public List<String> getServices(String clusterName) {
     String parentPath = String.format("%s/%s/input", root, clusterName);
@@ -206,9 +261,14 @@ public class LogSearchConfigZK implements LogSearchConfig {
   }
 
   @Override
-  public String getInputConfig(String clusterName, String serviceName) {
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
+    String globalConfigNodePath = String.format("%s/%s/global", root, clusterName);
+    String globalConfigData = new String(cache.getCurrentData(globalConfigNodePath).getData());
+    JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData);
+    InputAdapter.setGlobalConfigs(globalConfigs);
+    
     ChildData childData = cache.getCurrentData(String.format("%s/%s/input/%s", root, clusterName, serviceName));
-    return childData == null ? null : new String(childData.getData());
+    return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/ConditionsImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/ConditionsImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/ConditionsImpl.java
new file mode 100644
index 0000000..8bbff8f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/ConditionsImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
+
+import com.google.gson.annotations.Expose;
+
+public class ConditionsImpl implements Conditions {
+  @Expose
+  private FieldsImpl fields;
+
+  public FieldsImpl getFields() {
+    return fields;
+  }
+
+  public void setFields(FieldsImpl fields) {
+    this.fields = fields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FieldsImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FieldsImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FieldsImpl.java
new file mode 100644
index 0000000..68cd0e2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FieldsImpl.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.Set;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
+
+import com.google.gson.annotations.Expose;
+
+public class FieldsImpl implements Fields {
+  @Expose
+  private Set<String> type;
+
+  public Set<String> getType() {
+    return type;
+  }
+
+  public void setType(Set<String> type) {
+    this.type = type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterAdapter.java
new file mode 100644
index 0000000..b84403b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+
+public class FilterAdapter implements JsonDeserializer<FilterDescriptorImpl> {
+  @Override
+  public FilterDescriptorImpl deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+    switch (json.getAsJsonObject().get("filter").getAsString()) {
+      case "grok":
+        return (FilterDescriptorImpl)context.deserialize(json, FilterGrokDescriptorImpl.class);
+      case "keyvalue":
+        return (FilterDescriptorImpl)context.deserialize(json, FilterKeyValueDescriptorImpl.class);
+      case "json":
+        return (FilterDescriptorImpl)context.deserialize(json, FilterJsonDescriptorImpl.class);
+      default:
+        throw new IllegalArgumentException("Unknown filter type: " + json.getAsJsonObject().get("filter").getAsString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterDescriptorImpl.java
new file mode 100644
index 0000000..4e11715
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterDescriptorImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public abstract class FilterDescriptorImpl implements FilterDescriptor {
+  @Expose
+  private String filter;
+
+  @Expose
+  private ConditionsImpl conditions;
+
+  @Expose
+  @SerializedName("sort_order")
+  private Integer sortOrder;
+
+  @Expose
+  @SerializedName("source_field")
+  private String sourceField;
+
+  @Expose
+  @SerializedName("remove_source_field")
+  private Boolean removeSourceField;
+
+  @Expose
+  @SerializedName("post_map_values")
+  private Map<String, List<PostMapValuesImpl>> postMapValues;
+
+  @Expose
+  @SerializedName("is_enabled")
+  private Boolean isEnabled;
+
+  public String getFilter() {
+    return filter;
+  }
+
+  public void setFilter(String filter) {
+    this.filter = filter;
+  }
+
+  public ConditionsImpl getConditions() {
+    return conditions;
+  }
+
+  public void setConditions(ConditionsImpl conditions) {
+    this.conditions = conditions;
+  }
+
+  public Integer getSortOrder() {
+    return sortOrder;
+  }
+
+  public void setSortOrder(Integer sortOrder) {
+    this.sortOrder = sortOrder;
+  }
+
+  public String getSourceField() {
+    return sourceField;
+  }
+
+  public void setSourceField(String sourceField) {
+    this.sourceField = sourceField;
+  }
+
+  public Boolean isRemoveSourceField() {
+    return removeSourceField;
+  }
+
+  public void setRemoveSourceField(Boolean removeSourceField) {
+    this.removeSourceField = removeSourceField;
+  }
+
+  public Map<String, ? extends List<? extends PostMapValues>> getPostMapValues() {
+    return postMapValues;
+  }
+
+  public void setPostMapValues(Map<String, List<PostMapValuesImpl>> postMapValues) {
+    this.postMapValues = postMapValues;
+  }
+
+  public Boolean isEnabled() {
+    return isEnabled;
+  }
+
+  public void setIsEnabled(Boolean isEnabled) {
+    this.isEnabled = isEnabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
new file mode 100644
index 0000000..7f40b7f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterGrokDescriptorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class FilterGrokDescriptorImpl extends FilterDescriptorImpl implements FilterGrokDescriptor {
+  @Expose
+  @SerializedName("log4j_format")
+  private String log4jFormat;
+
+  @Expose
+  @SerializedName("multiline_pattern")
+  private String multilinePattern;
+
+  @Expose
+  @SerializedName("message_pattern")
+  private String messagePattern;
+
+  @Override
+  public String getLog4jFormat() {
+    return log4jFormat;
+  }
+
+  public void setLog4jFormat(String log4jFormat) {
+    this.log4jFormat = log4jFormat;
+  }
+
+  @Override
+  public String getMultilinePattern() {
+    return multilinePattern;
+  }
+
+  public void setMultilinePattern(String multilinePattern) {
+    this.multilinePattern = multilinePattern;
+  }
+
+  @Override
+  public String getMessagePattern() {
+    return messagePattern;
+  }
+
+  public void setMessagePattern(String messagePattern) {
+    this.messagePattern = messagePattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterJsonDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterJsonDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterJsonDescriptorImpl.java
new file mode 100644
index 0000000..9bf1a2b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterJsonDescriptorImpl.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterJsonDescriptor;
+
+public class FilterJsonDescriptorImpl extends FilterDescriptorImpl implements FilterJsonDescriptor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterKeyValueDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterKeyValueDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterKeyValueDescriptorImpl.java
new file mode 100644
index 0000000..8e89990
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/FilterKeyValueDescriptorImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class FilterKeyValueDescriptorImpl extends FilterDescriptorImpl implements FilterKeyValueDescriptor {
+  @Expose
+  @SerializedName("field_split")
+  private String fieldSplit;
+
+  @Expose
+  @SerializedName("value_split")
+  private String valueSplit;
+
+  @Expose
+  @SerializedName("value_borders")
+  private String valueBorders;
+
+  public String getFieldSplit() {
+    return fieldSplit;
+  }
+
+  public void setFieldSplit(String fieldSplit) {
+    this.fieldSplit = fieldSplit;
+  }
+
+  public String getValueSplit() {
+    return valueSplit;
+  }
+
+  public void setValueSplit(String valueSplit) {
+    this.valueSplit = valueSplit;
+  }
+
+  public String getValueBorders() {
+    return valueBorders;
+  }
+
+  public void setValueBorders(String valueBorders) {
+    this.valueBorders = valueBorders;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputAdapter.java
new file mode 100644
index 0000000..86741c6
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+
+public class InputAdapter implements JsonDeserializer<InputDescriptorImpl> {
+  private static JsonArray globalConfigs;
+  public static void setGlobalConfigs(JsonArray globalConfigs_) {
+    globalConfigs = globalConfigs_;
+  }
+  
+  @Override
+  public InputDescriptorImpl deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+    String source = null;
+    if (json.getAsJsonObject().has("source")) {
+      source = json.getAsJsonObject().get("source").getAsString();
+    } else {
+      for (JsonElement e : globalConfigs) {
+        if (e.getAsJsonObject().has("source")) {
+          source = e.getAsJsonObject().get("source").getAsString();
+          break;
+        }
+      }
+    }
+    
+    switch (source) {
+      case "file":
+        return (InputDescriptorImpl)context.deserialize(json, InputFileDescriptorImpl.class);
+      case "s3_file":
+        return (InputDescriptorImpl)context.deserialize(json, InputS3FileDescriptorImpl.class);
+      default:
+        throw new IllegalArgumentException("Unknown input type: " + json.getAsJsonObject().get("source").getAsString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigGson.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigGson.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigGson.java
new file mode 100644
index 0000000..3b78aff
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigGson.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Helper class to convert betweeb json string and InputConfig class.
+ */
+public class InputConfigGson {
+  public static Gson gson;
+  static {
+    Type inputType = new TypeToken<InputDescriptorImpl>() {}.getType();
+    Type filterType = new TypeToken<FilterDescriptorImpl>() {}.getType();
+    Type postMapValuesType = new TypeToken<List<PostMapValuesImpl>>() {}.getType();
+    gson = new GsonBuilder()
+        .registerTypeAdapter(inputType, new InputAdapter())
+        .registerTypeAdapter(filterType, new FilterAdapter())
+        .registerTypeAdapter(postMapValuesType, new PostMapValuesAdapter())
+        .setPrettyPrinting()
+        .excludeFieldsWithoutExposeAnnotation()
+        .create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigImpl.java
new file mode 100644
index 0000000..a4eba8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputConfigImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import com.google.gson.annotations.Expose;
+
+public class InputConfigImpl implements InputConfig {
+  @Expose
+  private List<InputDescriptorImpl> input;
+
+  @Expose
+  private List<FilterDescriptorImpl> filter;
+
+  @Override
+  public List<? extends InputDescriptor> getInput() {
+    return input;
+  }
+
+  public void setInput(List<InputDescriptorImpl> input) {
+    this.input = input;
+  }
+
+  @Override
+  public List<? extends FilterDescriptor> getFilter() {
+    return filter;
+  }
+
+  public void setFilter(List<FilterDescriptorImpl> filter) {
+    this.filter = filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
new file mode 100644
index 0000000..94dcc2a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public abstract class InputDescriptorImpl implements InputDescriptor {
+  @Expose
+  private String type;
+
+  @Expose
+  private String rowtype;
+
+  @Expose
+  private String path;
+
+  @Expose
+  @SerializedName("add_fields")
+  private Map<String, String> addFields;
+  
+  @Expose
+  private String source;
+  
+  @Expose
+  private Boolean tail;
+  
+  @Expose
+  @SerializedName("gen_event_md5")
+  private Boolean genEventMd5;
+  
+  @Expose
+  @SerializedName("use_event_md5_as_id")
+  private Boolean useEventMd5AsId;
+  
+  @Expose
+  @SerializedName("start_position")
+  private String startPosition;
+
+  @Expose
+  @SerializedName("cache_enabled")
+  private Boolean cacheEnabled;
+
+  @Expose
+  @SerializedName("cache_key_field")
+  private String cacheKeyField;
+
+  @Expose
+  @SerializedName("cache_last_dedup_enabled")
+  private Boolean cacheLastDedupEnabled;
+
+  @Expose
+  @SerializedName("cache_size")
+  private Integer cacheSize;
+
+  @Expose
+  @SerializedName("cache_dedup_interval")
+  private Long cacheDedupInterval;
+
+  @Expose
+  @SerializedName("is_enabled")
+  private Boolean isEnabled;
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public String getRowtype() {
+    return rowtype;
+  }
+
+  public void setRowtype(String rowType) {
+    this.rowtype = rowType;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public Map<String, String> getAddFields() {
+    return addFields;
+  }
+
+  public void setAddFields(Map<String, String> addFields) {
+    this.addFields = addFields;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public void setSource(String source) {
+    this.source = source;
+  }
+
+  public Boolean isTail() {
+    return tail;
+  }
+
+  public void setTail(Boolean tail) {
+    this.tail = tail;
+  }
+
+  public Boolean isGenEventMd5() {
+    return genEventMd5;
+  }
+
+  public void setGenEventMd5(Boolean genEventMd5) {
+    this.genEventMd5 = genEventMd5;
+  }
+
+  public Boolean isUseEventMd5AsId() {
+    return useEventMd5AsId;
+  }
+
+  public void setUseEventMd5AsId(Boolean useEventMd5AsId) {
+    this.useEventMd5AsId = useEventMd5AsId;
+  }
+
+  public String getStartPosition() {
+    return startPosition;
+  }
+
+  public void setStartPosition(String startPosition) {
+    this.startPosition = startPosition;
+  }
+
+  public Boolean isCacheEnabled() {
+    return cacheEnabled;
+  }
+
+  public void setCacheEnabled(Boolean cacheEnabled) {
+    this.cacheEnabled = cacheEnabled;
+  }
+
+  public String getCacheKeyField() {
+    return cacheKeyField;
+  }
+
+  public void setCacheKeyField(String cacheKeyField) {
+    this.cacheKeyField = cacheKeyField;
+  }
+
+  public Boolean getCacheLastDedupEnabled() {
+    return cacheLastDedupEnabled;
+  }
+
+  public void setCacheLastDedupEnabled(Boolean cacheLastDedupEnabled) {
+    this.cacheLastDedupEnabled = cacheLastDedupEnabled;
+  }
+
+  public Integer getCacheSize() {
+    return cacheSize;
+  }
+
+  public void setCacheSize(Integer cacheSize) {
+    this.cacheSize = cacheSize;
+  }
+
+  public Long getCacheDedupInterval() {
+    return cacheDedupInterval;
+  }
+
+  public void setCacheDedupInterval(Long cacheDedupInterval) {
+    this.cacheDedupInterval = cacheDedupInterval;
+  }
+
+  public Boolean isEnabled() {
+    return isEnabled;
+  }
+
+  public void setIsEnabled(Boolean isEnabled) {
+    this.isEnabled = isEnabled;
+  }
+}


[3/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input configuration (mgergely)

Posted by mg...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
new file mode 100644
index 0000000..51c7ec8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputFileBaseDescriptorImpl extends InputDescriptorImpl implements InputFileBaseDescriptor {
+  @Expose
+  @SerializedName("checkpoint_interval_ms")
+  private Integer checkpointIntervalMs;
+
+  @Expose
+  @SerializedName("process_file")
+  private Boolean processFile;
+
+  @Expose
+  @SerializedName("copy_file")
+  private Boolean copyFile;
+
+  @Override
+  public Boolean getProcessFile() {
+    return processFile;
+  }
+
+  public void setProcessFile(Boolean processFile) {
+    this.processFile = processFile;
+  }
+
+  @Override
+  public Boolean getCopyFile() {
+    return copyFile;
+  }
+
+  public void setCopyFile(Boolean copyFile) {
+    this.copyFile = copyFile;
+  }
+
+  @Override
+  public Integer getCheckpointIntervalMs() {
+    return checkpointIntervalMs;
+  }
+
+  public void setCheckpointIntervalMs(Integer checkpointIntervalMs) {
+    this.checkpointIntervalMs = checkpointIntervalMs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
new file mode 100644
index 0000000..3bfd161
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
+
+public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputFileDescriptor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
new file mode 100644
index 0000000..277a57c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputS3FileDescriptor {
+  @Expose
+  @SerializedName("s3_access_key")
+  private String s3AccessKey;
+
+  @Expose
+  @SerializedName("s3_secret_key")
+  private String s3SecretKey;
+
+  @Override
+  public String getS3AccessKey() {
+    return s3AccessKey;
+  }
+
+  public void setS3AccessKey(String s3AccessKey) {
+    this.s3AccessKey = s3AccessKey;
+  }
+
+  @Override
+  public String getS3SecretKey() {
+    return s3SecretKey;
+  }
+
+  public void setS3SecretKey(String s3SecretKey) {
+    this.s3SecretKey = s3SecretKey;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
new file mode 100644
index 0000000..9daad2b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapDateDescriptorImpl implements MapDateDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_date";
+  }
+
+  @Expose
+  @SerializedName("source_date_pattern")
+  private String sourceDatePattern;
+
+  @Expose
+  @SerializedName("target_date_pattern")
+  private String targetDatePattern;
+
+  @Override
+  public String getSourceDatePattern() {
+    return sourceDatePattern;
+  }
+
+  public void setSourceDatePattern(String sourceDatePattern) {
+    this.sourceDatePattern = sourceDatePattern;
+  }
+
+  @Override
+  public String getTargetDatePattern() {
+    return targetDatePattern;
+  }
+
+  public void setTargetDatePattern(String targetDatePattern) {
+    this.targetDatePattern = targetDatePattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
new file mode 100644
index 0000000..4a8d746
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldCopyDescriptorImpl implements MapFieldCopyDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldcopy";
+  }
+
+  @Expose
+  @SerializedName("copy_name")
+  private String copyName;
+
+  @Override
+  public String getCopyName() {
+    return copyName;
+  }
+
+  public void setCopyName(String copyName) {
+    this.copyName = copyName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
new file mode 100644
index 0000000..333cb67
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldNameDescriptorImpl implements MapFieldNameDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldname";
+  }
+
+  @Expose
+  @SerializedName("new_fieldname")
+  private String newFieldName;
+
+  @Override
+  public String getNewFieldName() {
+    return newFieldName;
+  }
+
+  public void setNewFieldName(String newFieldName) {
+    this.newFieldName = newFieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
new file mode 100644
index 0000000..599e152
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldValueDescriptorImpl implements MapFieldValueDescriptor {
+  @Override
+  public String getJsonName() {
+    return "map_fieldvalue";
+  }
+
+  @Expose
+  @SerializedName("pre_value")
+  private String preValue;
+
+  @Expose
+  @SerializedName("post_value")
+  private String postValue;
+
+  @Override
+  public String getPreValue() {
+    return preValue;
+  }
+
+  public void setPreValue(String preValue) {
+    this.preValue = preValue;
+  }
+
+  @Override
+  public String getPostValue() {
+    return postValue;
+  }
+
+  public void setPostValue(String postValue) {
+    this.postValue = postValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
new file mode 100644
index 0000000..32aded8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+
+public class PostMapValuesAdapter implements JsonDeserializer<List<PostMapValuesImpl>>, JsonSerializer<List<PostMapValuesImpl>> {
+  @Override
+  public List<PostMapValuesImpl> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+    List<PostMapValuesImpl> vals = new ArrayList<>();
+    if (json.isJsonArray()) {
+      for (JsonElement e : json.getAsJsonArray()) {
+        vals.add(createPostMapValues(e, context));
+      }
+    } else if (json.isJsonObject()) {
+      vals.add(createPostMapValues(json, context));
+    } else {
+      throw new RuntimeException("Unexpected JSON type: " + json.getClass());
+    }
+    return vals;
+  }
+
+  private PostMapValuesImpl createPostMapValues(JsonElement e, JsonDeserializationContext context) {
+    List<MapFieldDescriptor> mappers = new ArrayList<>();
+    for (Map.Entry<String, JsonElement> m : e.getAsJsonObject().entrySet()) {
+      switch (m.getKey()) {
+        case "map_date":
+          mappers.add((MapDateDescriptorImpl)context.deserialize(m.getValue(), MapDateDescriptorImpl.class));
+          break;
+        case "map_fieldcopy":
+          mappers.add((MapFieldCopyDescriptorImpl)context.deserialize(m.getValue(), MapFieldCopyDescriptorImpl.class));
+          break;
+        case "map_fieldname":
+          mappers.add((MapFieldNameDescriptorImpl)context.deserialize(m.getValue(), MapFieldNameDescriptorImpl.class));
+          break;
+        case "map_fieldvalue":
+          mappers.add((MapFieldValueDescriptorImpl)context.deserialize(m.getValue(), MapFieldValueDescriptorImpl.class));
+          break;
+        default:
+          System.out.println("Unknown key: " + m.getKey());
+      }
+    }
+    
+    PostMapValuesImpl postMapValues = new PostMapValuesImpl();
+    postMapValues.setMappers(mappers);
+    return postMapValues;
+  }
+
+  @Override
+  public JsonElement serialize(List<PostMapValuesImpl> src, Type typeOfSrc, JsonSerializationContext context) {
+    if (src.size() == 1) {
+      return createMapperObject(src.get(0), context);
+    } else {
+      JsonArray jsonArray = new JsonArray();
+      for (PostMapValuesImpl postMapValues : src) {
+        jsonArray.add(createMapperObject(postMapValues, context));
+      }
+      return jsonArray;
+    }
+  }
+
+  private JsonElement createMapperObject(PostMapValuesImpl postMapValues, JsonSerializationContext context) {
+    JsonObject jsonObject = new JsonObject();
+    for (MapFieldDescriptor m : postMapValues.getMappers()) {
+      jsonObject.add(((MapFieldDescriptor)m).getJsonName(), context.serialize(m));
+    }
+    return jsonObject;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
new file mode 100644
index 0000000..4d2254a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.google.gson.annotations.Expose;
+
+public class PostMapValuesImpl implements PostMapValues {
+  @Expose
+  private List<MapFieldDescriptor> mappers;
+
+  public List<MapFieldDescriptor> getMappers() {
+    return mappers;
+  }
+
+  public void setMappers(List<MapFieldDescriptor> mappers) {
+    this.mappers = mappers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index c853f42..8d7c69f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -34,7 +34,7 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.SSLUtil;
-import org.apache.curator.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 68897e8..cfcc199 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -20,54 +20,19 @@
 package org.apache.ambari.logfeeder.common;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
 
-public abstract class ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(ConfigBlock.class);
-
-  private boolean drain = false;
-
+public abstract class ConfigBlock extends ConfigItem {
   protected Map<String, Object> configs;
   protected Map<String, String> contextFields = new HashMap<String, String>();
-  public MetricData statMetric = new MetricData(getStatMetricName(), false);
-  protected String getStatMetricName() {
-    return null;
-  }
-  
   public ConfigBlock() {
   }
 
-  /**
-   * Used while logging. Keep it short and meaningful
-   */
-  public abstract String getShortDescription();
-
-  /**
-   * Every implementor need to give name to the thread they create
-   */
-  public String getNameForThread() {
-    return this.getClass().getSimpleName();
-  }
-
-  public void addMetricsContainers(List<MetricData> metricsList) {
-    metricsList.add(statMetric);
-  }
-
-  /**
-   * This method needs to be overwritten by deriving classes.
-   */
-  public void init() throws Exception {
-  }
-
   public void loadConfig(Map<String, Object> map) {
     configs = LogFeederUtil.cloneObject(map);
 
@@ -81,46 +46,6 @@ public abstract class ConfigBlock {
     return configs;
   }
 
-  @SuppressWarnings("unchecked")
-  public boolean isEnabled() {
-    boolean isEnabled = getBooleanValue("is_enabled", true);
-    if (isEnabled) {
-      // Let's check for static conditions
-      Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions");
-      boolean allow = true;
-      if (MapUtils.isNotEmpty(conditions)) {
-        allow = false;
-        for (String conditionType : conditions.keySet()) {
-          if (conditionType.equalsIgnoreCase("fields")) {
-            Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
-            for (String fieldName : fields.keySet()) {
-              Object values = fields.get(fieldName);
-              if (values instanceof String) {
-                allow = isFieldConditionMatch(fieldName, (String) values);
-              } else {
-                List<String> listValues = (List<String>) values;
-                for (String stringValue : listValues) {
-                  allow = isFieldConditionMatch(fieldName, stringValue);
-                  if (allow) {
-                    break;
-                  }
-                }
-              }
-              if (allow) {
-                break;
-              }
-            }
-          }
-          if (allow) {
-            break;
-          }
-        }
-        isEnabled = allow;
-      }
-    }
-    return isEnabled;
-  }
-
   public boolean isFieldConditionMatch(String fieldName, String stringValue) {
     boolean allow = false;
     String fieldValue = (String) configs.get(fieldName);
@@ -207,27 +132,17 @@ public abstract class ConfigBlock {
     return retValue;
   }
 
-  public Map<String, String> getContextFields() {
-    return contextFields;
-  }
-
-  public void incrementStat(int count) {
-    statMetric.value += count;
-  }
-
-  public void logStatForMetric(MetricData metric, String prefixStr) {
-    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+  @Override
+  public boolean isEnabled() {
+    return getBooleanValue("is_enabled", true);
   }
 
-  public synchronized void logStat() {
-    logStatForMetric(statMetric, "Stat");
+  public Map<String, String> getContextFields() {
+    return contextFields;
   }
 
   public boolean logConfigs(Priority level) {
-    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
-      return false;
-    }
-    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+    if (!super.logConfigs(level)) {
       return false;
     }
     LOG.log(level, "Printing configuration Block=" + getShortDescription());
@@ -235,12 +150,4 @@ public abstract class ConfigBlock {
     LOG.log(level, "contextFields=" + contextFields);
     return true;
   }
-
-  public boolean isDrain() {
-    return drain;
-  }
-
-  public void setDrain(boolean drain) {
-    this.drain = drain;
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index effe980..726ff27 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -46,13 +46,19 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.gson.reflect.TypeToken;
 
 public class ConfigHandler implements InputConfigMonitor {
@@ -61,10 +67,11 @@ public class ConfigHandler implements InputConfigMonitor {
   private final OutputManager outputManager = new OutputManager();
   private final InputManager inputManager = new InputManager();
 
-  public static Map<String, Object> globalConfigs = new HashMap<>();
+  private final Map<String, Object> globalConfigs = new HashMap<>();
+  private final List<String> globalConfigJsons = new ArrayList<String>();
 
-  private final List<Map<String, Object>> inputConfigList = new ArrayList<>();
-  private final List<Map<String, Object>> filterConfigList = new ArrayList<>();
+  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
+  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
   private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
   
   private boolean simulateMode = false;
@@ -141,11 +148,12 @@ public class ConfigHandler implements InputConfigMonitor {
   }
   
   @Override
-  public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception {
+  public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
     inputConfigList.clear();
     filterConfigList.clear();
     
-    loadConfigs(inputConfigData);
+    inputConfigList.addAll(inputConfig.getInput());
+    filterConfigList.addAll(inputConfig.getFilter());
     
     if (simulateMode) {
       InputSimulate.loadTypeToFilePath(inputConfigList);
@@ -173,14 +181,7 @@ public class ConfigHandler implements InputConfigMonitor {
       switch (key) {
         case "global" :
           globalConfigs.putAll((Map<String, Object>) configMap.get(key));
-          break;
-        case "input" :
-          List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
-          inputConfigList.addAll(inputConfig);
-          break;
-        case "filter" :
-          List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
-          filterConfigList.addAll(filterConfig);
+          globalConfigJsons.add(configData);
           break;
         case "output" :
           List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
@@ -192,21 +193,28 @@ public class ConfigHandler implements InputConfigMonitor {
     }
   }
   
+  @Override
+  public List<String> getGlobalConfigJsons() {
+    return globalConfigJsons;
+  }
+  
   private void simulateIfNeeded() throws Exception {
     int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
     if (simulatedInputNumber == 0)
       return;
     
-    List<Map<String, Object>> simulateInputConfigList = new ArrayList<>();
+    InputConfigImpl simulateInputConfig = new InputConfigImpl();
+    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
+    simulateInputConfig.setInput(inputConfigDescriptors);
+    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
     for (int i = 0; i < simulatedInputNumber; i++) {
-      HashMap<String, Object> mapList = new HashMap<String, Object>();
-      mapList.put("source", "simulate");
-      mapList.put("rowtype", "service");
-      simulateInputConfigList.add(mapList);
+      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+      inputDescriptor.setSource("simulate");
+      inputDescriptor.setRowtype("service");
+      inputDescriptor.setAddFields(new HashMap<String, String>());
+      inputConfigDescriptors.add(inputDescriptor);
     }
     
-    Map<String, List<Map<String, Object>>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList);
-    String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap);
     loadInputConfigs("Simulation", simulateInputConfig);
     
     simulateMode = true;
@@ -233,7 +241,7 @@ public class ConfigHandler implements InputConfigMonitor {
       output.loadConfig(map);
 
       // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
-      if (output.getBooleanValue("is_enabled", true)) {
+      if (output.isEnabled()) {
         output.logConfigs(Level.INFO);
         outputManager.add(output);
       } else {
@@ -243,24 +251,23 @@ public class ConfigHandler implements InputConfigMonitor {
   }
 
   private void loadInputs(String serviceName) {
-    for (Map<String, Object> map : inputConfigList) {
-      if (map == null) {
+    for (InputDescriptor inputDescriptor : inputConfigList) {
+      if (inputDescriptor == null) {
         continue;
       }
-      mergeBlocks(globalConfigs, map);
 
-      String value = (String) map.get("source");
-      if (StringUtils.isEmpty(value)) {
+      String source = (String) inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
         LOG.error("Input block doesn't have source element");
         continue;
       }
-      Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
+      Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT);
       if (input == null) {
         LOG.error("Input object could not be found");
         continue;
       }
-      input.setType(value);
-      input.loadConfig(map);
+      input.setType(source);
+      input.loadConfig(inputDescriptor);
 
       if (input.isEnabled()) {
         input.setOutputManager(outputManager);
@@ -278,13 +285,20 @@ public class ConfigHandler implements InputConfigMonitor {
 
     List<Input> toRemoveInputList = new ArrayList<Input>();
     for (Input input : inputManager.getInputList(serviceName)) {
-      for (Map<String, Object> map : filterConfigList) {
-        if (map == null) {
+      for (FilterDescriptor filterDescriptor : filterConfigList) {
+        if (filterDescriptor == null) {
+          continue;
+        }
+        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+          LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
+          continue;
+        }
+        if (!input.isFilterRequired(filterDescriptor)) {
+          LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
           continue;
         }
-        mergeBlocks(globalConfigs, map);
 
-        String value = (String) map.get("filter");
+        String value = filterDescriptor.getFilter();
         if (StringUtils.isEmpty(value)) {
           LOG.error("Filter block doesn't have filter element");
           continue;
@@ -294,16 +308,12 @@ public class ConfigHandler implements InputConfigMonitor {
           LOG.error("Filter object could not be found");
           continue;
         }
-        filter.loadConfig(map);
+        filter.loadConfig(filterDescriptor);
         filter.setInput(input);
 
-        if (filter.isEnabled()) {
-          filter.setOutputManager(outputManager);
-          input.addFilter(filter);
-          filter.logConfigs(Level.INFO);
-        } else {
-          LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
-        }
+        filter.setOutputManager(outputManager);
+        input.addFilter(filter);
+        filter.logConfigs(Level.INFO);
       }
       
       if (input.getFirstFilter() == null) {
@@ -318,43 +328,25 @@ public class ConfigHandler implements InputConfigMonitor {
   }
 
   private void sortFilters() {
-    Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
-
+    Collections.sort(filterConfigList, new Comparator<FilterDescriptor>() {
       @Override
-      public int compare(Map<String, Object> o1, Map<String, Object> o2) {
-        Object o1Sort = o1.get("sort_order");
-        Object o2Sort = o2.get("sort_order");
+      public int compare(FilterDescriptor o1, FilterDescriptor o2) {
+        Integer o1Sort = o1.getSortOrder();
+        Integer o2Sort = o2.getSortOrder();
         if (o1Sort == null || o2Sort == null) {
           return 0;
         }
         
-        int o1Value = parseSort(o1, o1Sort);
-        int o2Value = parseSort(o2, o2Sort);
-        
-        return o1Value - o2Value;
-      }
-
-      private int parseSort(Map<String, Object> map, Object o) {
-        if (!(o instanceof Number)) {
-          try {
-            return (new Double(Double.parseDouble(o.toString()))).intValue();
-          } catch (Throwable t) {
-            LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
-              + ", map=" + map.toString());
-            return 0;
-          }
-        } else {
-          return ((Number) o).intValue();
-        }
+        return o1Sort - o2Sort;
       }
-    });
+    } );
   }
 
   private void assignOutputsToInputs(String serviceName) {
     Set<Output> usedOutputSet = new HashSet<Output>();
     for (Input input : inputManager.getInputList(serviceName)) {
       for (Output output : outputManager.getOutputs()) {
-        if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+        if (input.isOutputRequired(output)) {
           usedOutputSet.add(output);
           input.addOutput(output);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
new file mode 100644
index 0000000..5c20a8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.util.List;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+public abstract class ConfigItem {
+
+  protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
+  private boolean drain = false;
+  public MetricData statMetric = new MetricData(getStatMetricName(), false);
+
+  public ConfigItem() {
+    super();
+  }
+
+  protected String getStatMetricName() {
+    return null;
+  }
+
+  /**
+   * Used while logging. Keep it short and meaningful
+   */
+  public abstract String getShortDescription();
+
+  /**
+   * Every implementor need to give name to the thread they create
+   */
+  public String getNameForThread() {
+    return this.getClass().getSimpleName();
+  }
+
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    metricsList.add(statMetric);
+  }
+
+  /**
+   * This method needs to be overwritten by deriving classes.
+   */
+  public void init() throws Exception {
+  }
+
+  public abstract boolean isEnabled();
+
+  public void incrementStat(int count) {
+    statMetric.value += count;
+  }
+
+  public void logStatForMetric(MetricData metric, String prefixStr) {
+    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+  }
+
+  public synchronized void logStat() {
+    logStatForMetric(statMetric, "Stat");
+  }
+
+  public boolean logConfigs(Priority level) {
+    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
+      return false;
+    }
+    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+      return false;
+    }
+    return true;
+  }
+
+  public boolean isDrain() {
+    return drain;
+  }
+
+  public void setDrain(boolean drain) {
+    this.drain = drain;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index afd903e..fd02497 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
@@ -33,18 +33,28 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.AliasUtil;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.log4j.Logger;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.log4j.Priority;
 
-public abstract class Filter extends ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(Filter.class);
-
+public abstract class Filter extends ConfigItem {
+  protected FilterDescriptor filterDescriptor;
   protected Input input;
   private Filter nextFilter = null;
   private OutputManager outputManager;
 
   private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
 
+  public void loadConfig(FilterDescriptor filterDescriptor) {
+    this.filterDescriptor = filterDescriptor;
+  }
+
+  public FilterDescriptor getFilterDescriptor() {
+    return filterDescriptor;
+  }
+
   @Override
   public void init() throws Exception {
     super.init();
@@ -55,28 +65,22 @@ public abstract class Filter extends ConfigBlock {
     }
   }
 
-  @SuppressWarnings("unchecked")
   private void initializePostMapValues() {
-    Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
+    Map<String, ? extends List<? extends PostMapValues>> postMapValues = filterDescriptor.getPostMapValues();
     if (postMapValues == null) {
       return;
     }
     for (String fieldName : postMapValues.keySet()) {
-      List<Map<String, Object>> mapList = null;
-      Object values = postMapValues.get(fieldName);
-      if (values instanceof List<?>) {
-        mapList = (List<Map<String, Object>>) values;
-      } else {
-        mapList = new ArrayList<Map<String, Object>>();
-        mapList.add((Map<String, Object>) values);
-      }
-      for (Map<String, Object> mapObject : mapList) {
-        for (String mapClassCode : mapObject.keySet()) {
+      List<? extends PostMapValues> values = postMapValues.get(fieldName);
+      for (PostMapValues pmv : values) {
+        for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
+          String mapClassCode = mapFieldDescriptor.getJsonName();
           Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
           if (mapper == null) {
-            break;
+            LOG.warn("Unknown mapper type: " + mapClassCode);
+            continue;
           }
-          if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) {
+          if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) {
             List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
             if (fieldMapList == null) {
               fieldMapList = new ArrayList<Mapper>();
@@ -156,15 +160,8 @@ public abstract class Filter extends ConfigBlock {
   }
 
   @Override
-  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
-    if (!super.isFieldConditionMatch(fieldName, stringValue)) {
-      if (input != null) {
-        return input.isFieldConditionMatch(fieldName, stringValue);
-      } else {
-        return false;
-      }
-    }
-    return true;
+  public boolean isEnabled() {
+    return BooleanUtils.isNotFalse(filterDescriptor.isEnabled());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7e2da70..70aea65 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -38,6 +38,8 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -75,11 +77,10 @@ public class FilterGrok extends Filter {
     super.init();
 
     try {
-      messagePattern = escapePattern(getStringValue("message_pattern"));
-      multilinePattern = escapePattern(getStringValue("multiline_pattern"));
-      sourceField = getStringValue("source_field");
-      removeSourceField = getBooleanValue("remove_source_field",
-        removeSourceField);
+      messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
+      multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
+      sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField();
+      removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
 
       LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
       getShortDescription());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 35f692e..cfccdeb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -25,12 +25,9 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
 
 public class FilterJSON extends Filter {
   
-  private static final Logger LOG  = Logger.getLogger(FilterJSON.class);
-
   @Override
   public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
     Map<String, Object> jsonMap = null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index b04a439..f2a4186 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -28,13 +28,11 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public class FilterKeyValue extends Filter {
-  private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
-
   private String sourceField = null;
   private String valueSplit = "=";
   private String fieldSplit = "\t";
@@ -46,10 +44,10 @@ public class FilterKeyValue extends Filter {
   public void init() throws Exception {
     super.init();
 
-    sourceField = getStringValue("source_field");
-    valueSplit = getStringValue("value_split", valueSplit);
-    fieldSplit = getStringValue("field_split", fieldSplit);
-    valueBorders = getStringValue("value_borders");
+    sourceField = filterDescriptor.getSourceField();
+    valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
+    fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit);
+    valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders();
 
     LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
         fieldSplit + ", " + getShortDescription());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 41a1fa5..cfa1903 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -29,14 +29,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public abstract class AbstractInputFile extends Input {
-  protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class);
-
   private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
 
   protected File[] logFiles;
@@ -73,16 +73,16 @@ public abstract class AbstractInputFile extends Input {
 
     // Let's close the file and set it to true after we start monitoring it
     setClosed(true);
-    logPath = getStringValue("path");
-    tail = getBooleanValue("tail", tail);
-    checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
+    logPath = inputDescriptor.getPath();
+    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail);
+    checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
 
     if (StringUtils.isEmpty(logPath)) {
       LOG.error("path is empty for file input. " + getShortDescription());
       return;
     }
 
-    String startPosition = getStringValue("start_position");
+    String startPosition = inputDescriptor.getStartPosition();
     if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") ||
         startPosition.equalsIgnoreCase("begining") || !tail) {
       isStartFromBegining = true;
@@ -313,7 +313,7 @@ public abstract class AbstractInputFile extends Input {
 
   @Override
   public String getShortDescription() {
-    return "input:source=" + getStringValue("source") + ", path=" +
+    return "input:source=" + inputDescriptor.getSource() + ", path=" +
         (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 9f54d8a..fba596d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -21,23 +21,25 @@ package org.apache.ambari.logfeeder.input;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.cache.LRUCache;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-public abstract class Input extends ConfigBlock implements Runnable {
-  private static final Logger LOG = Logger.getLogger(Input.class);
-
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Priority;
+
+public abstract class Input extends ConfigItem implements Runnable {
   private static final boolean DEFAULT_TAIL = true;
   private static final boolean DEFAULT_USE_EVENT_MD5 = false;
   private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
@@ -47,12 +49,8 @@ public abstract class Input extends ConfigBlock implements Runnable {
   private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
   private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
 
-  private static final String CACHE_ENABLED = "cache_enabled";
-  private static final String CACHE_KEY_FIELD = "cache_key_field";
-  private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
-  private static final String CACHE_SIZE = "cache_size";
-  private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
-
+  protected InputDescriptor inputDescriptor;
+  
   protected InputManager inputManager;
   protected OutputManager outputManager;
   private List<Output> outputList = new ArrayList<Output>();
@@ -75,21 +73,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
     return null;
   }
   
-  @Override
-  public void loadConfig(Map<String, Object> map) {
-    super.loadConfig(map);
-    String typeValue = getStringValue("type");
-    if (typeValue != null) {
-      // Explicitly add type and value to field list
-      contextFields.put("type", typeValue);
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
-      if (addFields == null) {
-        addFields = new HashMap<String, Object>();
-        map.put("add_fields", addFields);
-      }
-      addFields.put("type", typeValue);
-    }
+  public void loadConfig(InputDescriptor inputDescriptor) {
+    this.inputDescriptor = inputDescriptor;
+  }
+
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
   }
 
   public void setType(String type) {
@@ -104,6 +93,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
     this.outputManager = outputManager;
   }
 
+  public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
+    Conditions conditions = filterDescriptor.getConditions();
+    Fields fields = conditions.getFields();
+    return fields.getType().contains(inputDescriptor.getType());
+  }
+
   public void addFilter(Filter filter) {
     if (firstFilter == null) {
       firstFilter = filter;
@@ -116,6 +111,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public boolean isOutputRequired(Output output) {
+    Map<String, Object> conditions = (Map<String, Object>) output.getConfigs().get("conditions");
+    if (conditions == null) {
+      return false;
+    }
+    
+    Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
+    if (fields == null) {
+      return false;
+    }
+    
+    List<String> types = (List<String>) fields.get("rowtype");
+    return types.contains(inputDescriptor.getRowtype());
+  }
+
   public void addOutput(Output output) {
     outputList.add(output);
   }
@@ -124,9 +135,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
   public void init() throws Exception {
     super.init();
     initCache();
-    tail = getBooleanValue("tail", DEFAULT_TAIL);
-    useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
-    genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
+    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
+    useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
+    genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
 
     if (firstFilter != null) {
       firstFilter.init();
@@ -236,26 +247,26 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   private void initCache() {
-    boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
-      ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
+    boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
+      ? inputDescriptor.isCacheEnabled()
       : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
     if (cacheEnabled) {
-      String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
-        ? getStringValue(CACHE_KEY_FIELD)
+      String cacheKeyField = inputDescriptor.getCacheKeyField() != null
+        ? inputDescriptor.getCacheKeyField()
         : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
 
-      setCacheKeyField(getStringValue(cacheKeyField));
+      setCacheKeyField(cacheKeyField);
 
-      boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
-        ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
+      boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
+        ? inputDescriptor.getCacheLastDedupEnabled()
         : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
 
-      int cacheSize = getConfigValue(CACHE_SIZE) != null
-        ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
+      int cacheSize = inputDescriptor.getCacheSize() != null
+        ? inputDescriptor.getCacheSize()
         : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
 
-      long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
-        ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
+      long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
+        ? inputDescriptor.getCacheDedupInterval()
         : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
 
       setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
@@ -319,6 +330,11 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   @Override
+  public boolean isEnabled() {
+    return BooleanUtils.isNotFalse(inputDescriptor.isEnabled());
+  }
+
+  @Override
   public String getNameForThread() {
     if (filePath != null) {
       try {
@@ -331,7 +347,17 @@ public abstract class Input extends ConfigBlock implements Runnable {
   }
 
   @Override
+  public boolean logConfigs(Priority level) {
+    if (!super.logConfigs(level)) {
+      return false;
+    }
+    LOG.log(level, "Printing Input=" + getShortDescription());
+    LOG.log(level, "description=" + inputDescriptor.getPath());
+    return true;
+  }
+
+  @Override
   public String toString() {
     return getShortDescription();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 3737839..fc40ca4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -25,7 +25,9 @@ import java.io.FileNotFoundException;
 
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
@@ -62,7 +64,7 @@ public class InputFile extends AbstractInputFile {
 
   @Override
   void start() throws Exception {
-    boolean isProcessFile = getBooleanValue("process_file", true);
+    boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
     if (isProcessFile) {
       if (tail) {
         processFile(logFiles[0]);
@@ -100,7 +102,7 @@ public class InputFile extends AbstractInputFile {
   }
 
   private void copyFiles(File[] files) {
-    boolean isCopyFile = getBooleanValue("copy_file", false);
+    boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false);
     if (isCopyFile && files != null) {
       for (File file : files) {
         try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index f560379..4bf162b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.ambari.logfeeder.util.S3Util;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
@@ -78,8 +79,8 @@ public class InputS3File extends AbstractInputFile {
 
   @Override
   protected BufferedReader openLogFile(File logPathFile) throws IOException {
-    String s3AccessKey = getStringValue("s3_access_key");
-    String s3SecretKey = getStringValue("s3_secret_key");
+    String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey();
+    String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey();
     BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
     fileKey = getFileKey(logPathFile);
     base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index d193cdb..5e7bdb3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -35,25 +34,23 @@ import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.filter.FilterJSON;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.commons.collections.MapUtils;
-import org.apache.log4j.Logger;
 import org.apache.solr.common.util.Base64;
 
 import com.google.common.base.Joiner;
 
 public class InputSimulate extends Input {
-  private static final Logger LOG = Logger.getLogger(InputSimulate.class);
-
   private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
   
   private static final Map<String, String> typeToFilePath = new HashMap<>();
-  private static List<String> inputTypes = new ArrayList<>();
-  public static void loadTypeToFilePath(List<Map<String, Object>> inputList) {
-    for (Map<String, Object> input : inputList) {
-      if (input.containsKey("type") && input.containsKey("path")) {
-        typeToFilePath.put((String)input.get("type"), (String)input.get("path"));
-        inputTypes.add((String)input.get("type"));
-      }
+  private static final List<String> inputTypes = new ArrayList<>();
+  public static void loadTypeToFilePath(List<InputDescriptor> inputList) {
+    for (InputDescriptor input : inputList) {
+      typeToFilePath.put(input.getType(), input.getPath());
+      inputTypes.add(input.getType());
     }
   }
   
@@ -86,7 +83,7 @@ public class InputSimulate extends Input {
     this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName;
     
     Filter filter = new FilterJSON();
-    filter.loadConfig(Collections.<String, Object> emptyMap());
+    filter.loadConfig(new FilterJsonDescriptorImpl());
     filter.setInput(this);
     addFilter(filter);
   }
@@ -141,7 +138,7 @@ public class InputSimulate extends Input {
     String type = types.get(typePos);
     String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type);
     
-    configs.put("type", type);
+    ((InputDescriptorImpl)inputDescriptor).setType(type);
     setFilePath(filePath);
     
     return type;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
index 1f635af..6173f53 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
@@ -44,7 +44,7 @@ public enum FilterLogData {
   }
 
   public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
+    if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype()))
       return true;
     
     boolean isAllowed = applyFilter(jsonObj);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 96709c0..5facf76 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
 public abstract class Mapper {
   private String inputDesc;
   protected String fieldName;
   private String mapClassCode;
 
-  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs);
+  public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
 
   protected void init(String inputDesc, String fieldName, String mapClassCode) {
     this.inputDesc = inputDesc;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 6a7fad7..5d34c06 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -26,6 +26,8 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
@@ -39,18 +41,11 @@ public class MapperDate extends Mapper {
   private SimpleDateFormat srcDateFormatter=null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() +
-        ", map=" + this);
-      return false;
-    }
     
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    String targetDateFormat = (String) mapObjects.get("target_date_pattern");
-    String srcDateFormat = (String) mapObjects.get("src_date_pattern");
+    String targetDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getTargetDatePattern();
+    String srcDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getSourceDatePattern();
     if (StringUtils.isEmpty(targetDateFormat)) {
       LOG.fatal("Date format for map is empty. " + this);
     } else {

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
index 39e1ff4..a463f49 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -33,16 +35,9 @@ public class MapperFieldCopy extends Mapper {
   private String copyName = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
-    
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    copyName = (String) mapObjects.get("copy_name");
+    copyName = ((MapFieldCopyDescriptor)mapFieldDescriptor).getCopyName();
     if (StringUtils.isEmpty(copyName)) {
       LOG.fatal("Map copy name is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 9b6e83c..3f160da 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -35,16 +37,10 @@ public class MapperFieldName extends Mapper {
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
-    
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    newValue = (String) mapObjects.get("new_fieldname");
+
+    newValue = ((MapFieldNameDescriptor)mapFieldDescriptor).getNewFieldName();
     if (StringUtils.isEmpty(newValue)) {
       LOG.fatal("Map field value is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 87cda65..03ff95b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
 import java.util.Map;
 
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -36,17 +38,11 @@ public class MapperFieldValue extends Mapper {
   private String newValue = null;
 
   @Override
-  public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+  public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
     init(inputDesc, fieldName, mapClassCode);
-    if (!(mapConfigs instanceof Map)) {
-      LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
-      return false;
-    }
     
-    @SuppressWarnings("unchecked")
-    Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
-    prevValue = (String) mapObjects.get("pre_value");
-    newValue = (String) mapObjects.get("post_value");
+    prevValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPreValue();
+    newValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPostValue();;
     if (StringUtils.isEmpty(newValue)) {
       LOG.fatal("Map field value is empty.");
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index bc6a553..65b9e19 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -28,11 +28,8 @@ import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
 
 public abstract class Output extends ConfigBlock {
-  private static final Logger LOG = Logger.getLogger(Output.class);
-
   private String destination = null;
 
   protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);

http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
index fcf2695..8308a4f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -41,7 +41,7 @@ public class OutputLineFilter {
   public Boolean apply(Map<String, Object> lineMap, Input input) {
     boolean isLogFilteredOut = false;
     LRUCache inputLruCache = input.getCache();
-    if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE))) {
+    if (inputLruCache != null && "service".equals(input.getInputDescriptor().getRowtype())) {
       String logMessage = (String) lineMap.get(input.getCacheKeyField());
       Long timestamp = null;
       if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {