You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/05/22 10:50:15 UTC
[3/4] ambari git commit: AMBARI-21033 Log Search use POJOs for input
configuration (mgergely)
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
new file mode 100644
index 0000000..51c7ec8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileBaseDescriptorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputFileBaseDescriptorImpl extends InputDescriptorImpl implements InputFileBaseDescriptor {
+ @Expose
+ @SerializedName("checkpoint_interval_ms")
+ private Integer checkpointIntervalMs;
+
+ @Expose
+ @SerializedName("process_file")
+ private Boolean processFile;
+
+ @Expose
+ @SerializedName("copy_file")
+ private Boolean copyFile;
+
+ @Override
+ public Boolean getProcessFile() {
+ return processFile;
+ }
+
+ public void setProcessFile(Boolean processFile) {
+ this.processFile = processFile;
+ }
+
+ @Override
+ public Boolean getCopyFile() {
+ return copyFile;
+ }
+
+ public void setCopyFile(Boolean copyFile) {
+ this.copyFile = copyFile;
+ }
+
+ @Override
+ public Integer getCheckpointIntervalMs() {
+ return checkpointIntervalMs;
+ }
+
+ public void setCheckpointIntervalMs(Integer checkpointIntervalMs) {
+ this.checkpointIntervalMs = checkpointIntervalMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
new file mode 100644
index 0000000..3bfd161
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
+
+public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputFileDescriptor {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
new file mode 100644
index 0000000..277a57c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputS3FileDescriptorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl implements InputS3FileDescriptor {
+ @Expose
+ @SerializedName("s3_access_key")
+ private String s3AccessKey;
+
+ @Expose
+ @SerializedName("s3_secret_key")
+ private String s3SecretKey;
+
+ @Override
+ public String getS3AccessKey() {
+ return s3AccessKey;
+ }
+
+ public void setS3AccessKey(String s3AccessKey) {
+ this.s3AccessKey = s3AccessKey;
+ }
+
+ @Override
+ public String getS3SecretKey() {
+ return s3SecretKey;
+ }
+
+ public void setS3SecretKey(String s3SecretKey) {
+ this.s3SecretKey = s3SecretKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
new file mode 100644
index 0000000..9daad2b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapDateDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapDateDescriptorImpl implements MapDateDescriptor {
+ @Override
+ public String getJsonName() {
+ return "map_date";
+ }
+
+ @Expose
+ @SerializedName("source_date_pattern")
+ private String sourceDatePattern;
+
+ @Expose
+ @SerializedName("target_date_pattern")
+ private String targetDatePattern;
+
+ @Override
+ public String getSourceDatePattern() {
+ return sourceDatePattern;
+ }
+
+ public void setSourceDatePattern(String sourceDatePattern) {
+ this.sourceDatePattern = sourceDatePattern;
+ }
+
+ @Override
+ public String getTargetDatePattern() {
+ return targetDatePattern;
+ }
+
+ public void setTargetDatePattern(String targetDatePattern) {
+ this.targetDatePattern = targetDatePattern;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
new file mode 100644
index 0000000..4a8d746
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldCopyDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldCopyDescriptorImpl implements MapFieldCopyDescriptor {
+ @Override
+ public String getJsonName() {
+ return "map_fieldcopy";
+ }
+
+ @Expose
+ @SerializedName("copy_name")
+ private String copyName;
+
+ @Override
+ public String getCopyName() {
+ return copyName;
+ }
+
+ public void setCopyName(String copyName) {
+ this.copyName = copyName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
new file mode 100644
index 0000000..333cb67
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldNameDescriptorImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldNameDescriptorImpl implements MapFieldNameDescriptor {
+ @Override
+ public String getJsonName() {
+ return "map_fieldname";
+ }
+
+ @Expose
+ @SerializedName("new_fieldname")
+ private String newFieldName;
+
+ @Override
+ public String getNewFieldName() {
+ return newFieldName;
+ }
+
+ public void setNewFieldName(String newFieldName) {
+ this.newFieldName = newFieldName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
new file mode 100644
index 0000000..599e152
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/MapFieldValueDescriptorImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class MapFieldValueDescriptorImpl implements MapFieldValueDescriptor {
+ @Override
+ public String getJsonName() {
+ return "map_fieldvalue";
+ }
+
+ @Expose
+ @SerializedName("pre_value")
+ private String preValue;
+
+ @Expose
+ @SerializedName("post_value")
+ private String postValue;
+
+ @Override
+ public String getPreValue() {
+ return preValue;
+ }
+
+ public void setPreValue(String preValue) {
+ this.preValue = preValue;
+ }
+
+ @Override
+ public String getPostValue() {
+ return postValue;
+ }
+
+ public void setPostValue(String postValue) {
+ this.postValue = postValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
new file mode 100644
index 0000000..32aded8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesAdapter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+
+public class PostMapValuesAdapter implements JsonDeserializer<List<PostMapValuesImpl>>, JsonSerializer<List<PostMapValuesImpl>> {
+ @Override
+ public List<PostMapValuesImpl> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+ List<PostMapValuesImpl> vals = new ArrayList<>();
+ if (json.isJsonArray()) {
+ for (JsonElement e : json.getAsJsonArray()) {
+ vals.add(createPostMapValues(e, context));
+ }
+ } else if (json.isJsonObject()) {
+ vals.add(createPostMapValues(json, context));
+ } else {
+ throw new RuntimeException("Unexpected JSON type: " + json.getClass());
+ }
+ return vals;
+ }
+
+ private PostMapValuesImpl createPostMapValues(JsonElement e, JsonDeserializationContext context) {
+ List<MapFieldDescriptor> mappers = new ArrayList<>();
+ for (Map.Entry<String, JsonElement> m : e.getAsJsonObject().entrySet()) {
+ switch (m.getKey()) {
+ case "map_date":
+ mappers.add((MapDateDescriptorImpl)context.deserialize(m.getValue(), MapDateDescriptorImpl.class));
+ break;
+ case "map_fieldcopy":
+ mappers.add((MapFieldCopyDescriptorImpl)context.deserialize(m.getValue(), MapFieldCopyDescriptorImpl.class));
+ break;
+ case "map_fieldname":
+ mappers.add((MapFieldNameDescriptorImpl)context.deserialize(m.getValue(), MapFieldNameDescriptorImpl.class));
+ break;
+ case "map_fieldvalue":
+ mappers.add((MapFieldValueDescriptorImpl)context.deserialize(m.getValue(), MapFieldValueDescriptorImpl.class));
+ break;
+ default:
+ System.out.println("Unknown key: " + m.getKey());
+ }
+ }
+
+ PostMapValuesImpl postMapValues = new PostMapValuesImpl();
+ postMapValues.setMappers(mappers);
+ return postMapValues;
+ }
+
+ @Override
+ public JsonElement serialize(List<PostMapValuesImpl> src, Type typeOfSrc, JsonSerializationContext context) {
+ if (src.size() == 1) {
+ return createMapperObject(src.get(0), context);
+ } else {
+ JsonArray jsonArray = new JsonArray();
+ for (PostMapValuesImpl postMapValues : src) {
+ jsonArray.add(createMapperObject(postMapValues, context));
+ }
+ return jsonArray;
+ }
+ }
+
+ private JsonElement createMapperObject(PostMapValuesImpl postMapValues, JsonSerializationContext context) {
+ JsonObject jsonObject = new JsonObject();
+ for (MapFieldDescriptor m : postMapValues.getMappers()) {
+ jsonObject.add(((MapFieldDescriptor)m).getJsonName(), context.serialize(m));
+ }
+ return jsonObject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
new file mode 100644
index 0000000..4d2254a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/PostMapValuesImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl;
+
+import java.util.List;
+
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+
+import com.google.gson.annotations.Expose;
+
+public class PostMapValuesImpl implements PostMapValues {
+ @Expose
+ private List<MapFieldDescriptor> mappers;
+
+ public List<MapFieldDescriptor> getMappers() {
+ return mappers;
+ }
+
+ public void setMappers(List<MapFieldDescriptor> mappers) {
+ this.mappers = mappers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index c853f42..8d7c69f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -34,7 +34,7 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.SSLUtil;
-import org.apache.curator.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 68897e8..cfcc199 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -20,54 +20,19 @@
package org.apache.ambari.logfeeder.common;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
-public abstract class ConfigBlock {
- private static final Logger LOG = Logger.getLogger(ConfigBlock.class);
-
- private boolean drain = false;
-
+public abstract class ConfigBlock extends ConfigItem {
protected Map<String, Object> configs;
protected Map<String, String> contextFields = new HashMap<String, String>();
- public MetricData statMetric = new MetricData(getStatMetricName(), false);
- protected String getStatMetricName() {
- return null;
- }
-
public ConfigBlock() {
}
- /**
- * Used while logging. Keep it short and meaningful
- */
- public abstract String getShortDescription();
-
- /**
- * Every implementor need to give name to the thread they create
- */
- public String getNameForThread() {
- return this.getClass().getSimpleName();
- }
-
- public void addMetricsContainers(List<MetricData> metricsList) {
- metricsList.add(statMetric);
- }
-
- /**
- * This method needs to be overwritten by deriving classes.
- */
- public void init() throws Exception {
- }
-
public void loadConfig(Map<String, Object> map) {
configs = LogFeederUtil.cloneObject(map);
@@ -81,46 +46,6 @@ public abstract class ConfigBlock {
return configs;
}
- @SuppressWarnings("unchecked")
- public boolean isEnabled() {
- boolean isEnabled = getBooleanValue("is_enabled", true);
- if (isEnabled) {
- // Let's check for static conditions
- Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions");
- boolean allow = true;
- if (MapUtils.isNotEmpty(conditions)) {
- allow = false;
- for (String conditionType : conditions.keySet()) {
- if (conditionType.equalsIgnoreCase("fields")) {
- Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
- for (String fieldName : fields.keySet()) {
- Object values = fields.get(fieldName);
- if (values instanceof String) {
- allow = isFieldConditionMatch(fieldName, (String) values);
- } else {
- List<String> listValues = (List<String>) values;
- for (String stringValue : listValues) {
- allow = isFieldConditionMatch(fieldName, stringValue);
- if (allow) {
- break;
- }
- }
- }
- if (allow) {
- break;
- }
- }
- }
- if (allow) {
- break;
- }
- }
- isEnabled = allow;
- }
- }
- return isEnabled;
- }
-
public boolean isFieldConditionMatch(String fieldName, String stringValue) {
boolean allow = false;
String fieldValue = (String) configs.get(fieldName);
@@ -207,27 +132,17 @@ public abstract class ConfigBlock {
return retValue;
}
- public Map<String, String> getContextFields() {
- return contextFields;
- }
-
- public void incrementStat(int count) {
- statMetric.value += count;
- }
-
- public void logStatForMetric(MetricData metric, String prefixStr) {
- LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+ @Override
+ public boolean isEnabled() {
+ return getBooleanValue("is_enabled", true);
}
- public synchronized void logStat() {
- logStatForMetric(statMetric, "Stat");
+ public Map<String, String> getContextFields() {
+ return contextFields;
}
public boolean logConfigs(Priority level) {
- if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
- return false;
- }
- if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+ if (!super.logConfigs(level)) {
return false;
}
LOG.log(level, "Printing configuration Block=" + getShortDescription());
@@ -235,12 +150,4 @@ public abstract class ConfigBlock {
LOG.log(level, "contextFields=" + contextFields);
return true;
}
-
- public boolean isDrain() {
- return drain;
- }
-
- public void setDrain(boolean drain) {
- this.drain = drain;
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index effe980..726ff27 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -46,13 +46,19 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.common.collect.ImmutableMap;
import com.google.gson.reflect.TypeToken;
public class ConfigHandler implements InputConfigMonitor {
@@ -61,10 +67,11 @@ public class ConfigHandler implements InputConfigMonitor {
private final OutputManager outputManager = new OutputManager();
private final InputManager inputManager = new InputManager();
- public static Map<String, Object> globalConfigs = new HashMap<>();
+ private final Map<String, Object> globalConfigs = new HashMap<>();
+ private final List<String> globalConfigJsons = new ArrayList<String>();
- private final List<Map<String, Object>> inputConfigList = new ArrayList<>();
- private final List<Map<String, Object>> filterConfigList = new ArrayList<>();
+ private final List<InputDescriptor> inputConfigList = new ArrayList<>();
+ private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
private boolean simulateMode = false;
@@ -141,11 +148,12 @@ public class ConfigHandler implements InputConfigMonitor {
}
@Override
- public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception {
+ public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
inputConfigList.clear();
filterConfigList.clear();
- loadConfigs(inputConfigData);
+ inputConfigList.addAll(inputConfig.getInput());
+ filterConfigList.addAll(inputConfig.getFilter());
if (simulateMode) {
InputSimulate.loadTypeToFilePath(inputConfigList);
@@ -173,14 +181,7 @@ public class ConfigHandler implements InputConfigMonitor {
switch (key) {
case "global" :
globalConfigs.putAll((Map<String, Object>) configMap.get(key));
- break;
- case "input" :
- List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
- inputConfigList.addAll(inputConfig);
- break;
- case "filter" :
- List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
- filterConfigList.addAll(filterConfig);
+ globalConfigJsons.add(configData);
break;
case "output" :
List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
@@ -192,21 +193,28 @@ public class ConfigHandler implements InputConfigMonitor {
}
}
+ @Override
+ public List<String> getGlobalConfigJsons() {
+ return globalConfigJsons;
+ }
+
private void simulateIfNeeded() throws Exception {
int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
if (simulatedInputNumber == 0)
return;
- List<Map<String, Object>> simulateInputConfigList = new ArrayList<>();
+ InputConfigImpl simulateInputConfig = new InputConfigImpl();
+ List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
+ simulateInputConfig.setInput(inputConfigDescriptors);
+ simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
for (int i = 0; i < simulatedInputNumber; i++) {
- HashMap<String, Object> mapList = new HashMap<String, Object>();
- mapList.put("source", "simulate");
- mapList.put("rowtype", "service");
- simulateInputConfigList.add(mapList);
+ InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+ inputDescriptor.setSource("simulate");
+ inputDescriptor.setRowtype("service");
+ inputDescriptor.setAddFields(new HashMap<String, String>());
+ inputConfigDescriptors.add(inputDescriptor);
}
- Map<String, List<Map<String, Object>>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList);
- String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap);
loadInputConfigs("Simulation", simulateInputConfig);
simulateMode = true;
@@ -233,7 +241,7 @@ public class ConfigHandler implements InputConfigMonitor {
output.loadConfig(map);
// We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
- if (output.getBooleanValue("is_enabled", true)) {
+ if (output.isEnabled()) {
output.logConfigs(Level.INFO);
outputManager.add(output);
} else {
@@ -243,24 +251,23 @@ public class ConfigHandler implements InputConfigMonitor {
}
private void loadInputs(String serviceName) {
- for (Map<String, Object> map : inputConfigList) {
- if (map == null) {
+ for (InputDescriptor inputDescriptor : inputConfigList) {
+ if (inputDescriptor == null) {
continue;
}
- mergeBlocks(globalConfigs, map);
- String value = (String) map.get("source");
- if (StringUtils.isEmpty(value)) {
+ String source = (String) inputDescriptor.getSource();
+ if (StringUtils.isEmpty(source)) {
LOG.error("Input block doesn't have source element");
continue;
}
- Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
+ Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT);
if (input == null) {
LOG.error("Input object could not be found");
continue;
}
- input.setType(value);
- input.loadConfig(map);
+ input.setType(source);
+ input.loadConfig(inputDescriptor);
if (input.isEnabled()) {
input.setOutputManager(outputManager);
@@ -278,13 +285,20 @@ public class ConfigHandler implements InputConfigMonitor {
List<Input> toRemoveInputList = new ArrayList<Input>();
for (Input input : inputManager.getInputList(serviceName)) {
- for (Map<String, Object> map : filterConfigList) {
- if (map == null) {
+ for (FilterDescriptor filterDescriptor : filterConfigList) {
+ if (filterDescriptor == null) {
+ continue;
+ }
+ if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+ LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
+ continue;
+ }
+ if (!input.isFilterRequired(filterDescriptor)) {
+ LOG.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
continue;
}
- mergeBlocks(globalConfigs, map);
- String value = (String) map.get("filter");
+ String value = filterDescriptor.getFilter();
if (StringUtils.isEmpty(value)) {
LOG.error("Filter block doesn't have filter element");
continue;
@@ -294,16 +308,12 @@ public class ConfigHandler implements InputConfigMonitor {
LOG.error("Filter object could not be found");
continue;
}
- filter.loadConfig(map);
+ filter.loadConfig(filterDescriptor);
filter.setInput(input);
- if (filter.isEnabled()) {
- filter.setOutputManager(outputManager);
- input.addFilter(filter);
- filter.logConfigs(Level.INFO);
- } else {
- LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
- }
+ filter.setOutputManager(outputManager);
+ input.addFilter(filter);
+ filter.logConfigs(Level.INFO);
}
if (input.getFirstFilter() == null) {
@@ -318,43 +328,25 @@ public class ConfigHandler implements InputConfigMonitor {
}
private void sortFilters() {
- Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
-
+ Collections.sort(filterConfigList, new Comparator<FilterDescriptor>() {
@Override
- public int compare(Map<String, Object> o1, Map<String, Object> o2) {
- Object o1Sort = o1.get("sort_order");
- Object o2Sort = o2.get("sort_order");
+ public int compare(FilterDescriptor o1, FilterDescriptor o2) {
+ Integer o1Sort = o1.getSortOrder();
+ Integer o2Sort = o2.getSortOrder();
if (o1Sort == null || o2Sort == null) {
return 0;
}
- int o1Value = parseSort(o1, o1Sort);
- int o2Value = parseSort(o2, o2Sort);
-
- return o1Value - o2Value;
- }
-
- private int parseSort(Map<String, Object> map, Object o) {
- if (!(o instanceof Number)) {
- try {
- return (new Double(Double.parseDouble(o.toString()))).intValue();
- } catch (Throwable t) {
- LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
- + ", map=" + map.toString());
- return 0;
- }
- } else {
- return ((Number) o).intValue();
- }
+ return o1Sort - o2Sort;
}
- });
+ } );
}
private void assignOutputsToInputs(String serviceName) {
Set<Output> usedOutputSet = new HashSet<Output>();
for (Input input : inputManager.getInputList(serviceName)) {
for (Output output : outputManager.getOutputs()) {
- if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+ if (input.isOutputRequired(output)) {
usedOutputSet.add(output);
input.addOutput(output);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
new file mode 100644
index 0000000..5c20a8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.util.List;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+public abstract class ConfigItem {
+
+ protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
+ private boolean drain = false;
+ public MetricData statMetric = new MetricData(getStatMetricName(), false);
+
+ public ConfigItem() {
+ super();
+ }
+
+ protected String getStatMetricName() {
+ return null;
+ }
+
+ /**
+ * Used while logging. Keep it short and meaningful
+ */
+ public abstract String getShortDescription();
+
+ /**
+ * Every implementor need to give name to the thread they create
+ */
+ public String getNameForThread() {
+ return this.getClass().getSimpleName();
+ }
+
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ metricsList.add(statMetric);
+ }
+
+ /**
+ * This method needs to be overwritten by deriving classes.
+ */
+ public void init() throws Exception {
+ }
+
+ public abstract boolean isEnabled();
+
+ public void incrementStat(int count) {
+ statMetric.value += count;
+ }
+
+ public void logStatForMetric(MetricData metric, String prefixStr) {
+ LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
+ }
+
+ public synchronized void logStat() {
+ logStatForMetric(statMetric, "Stat");
+ }
+
+ public boolean logConfigs(Priority level) {
+ if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
+ return false;
+ }
+ if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
+ return false;
+ }
+ return true;
+ }
+
+ public boolean isDrain() {
+ return drain;
+ }
+
+ public void setDrain(boolean drain) {
+ this.drain = drain;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index afd903e..fd02497 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
@@ -33,18 +33,28 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.AliasUtil;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.log4j.Logger;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+import org.apache.commons.lang.BooleanUtils;
import org.apache.log4j.Priority;
-public abstract class Filter extends ConfigBlock {
- private static final Logger LOG = Logger.getLogger(Filter.class);
-
+public abstract class Filter extends ConfigItem {
+ protected FilterDescriptor filterDescriptor;
protected Input input;
private Filter nextFilter = null;
private OutputManager outputManager;
private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
+ public void loadConfig(FilterDescriptor filterDescriptor) {
+ this.filterDescriptor = filterDescriptor;
+ }
+
+ public FilterDescriptor getFilterDescriptor() {
+ return filterDescriptor;
+ }
+
@Override
public void init() throws Exception {
super.init();
@@ -55,28 +65,22 @@ public abstract class Filter extends ConfigBlock {
}
}
- @SuppressWarnings("unchecked")
private void initializePostMapValues() {
- Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
+ Map<String, ? extends List<? extends PostMapValues>> postMapValues = filterDescriptor.getPostMapValues();
if (postMapValues == null) {
return;
}
for (String fieldName : postMapValues.keySet()) {
- List<Map<String, Object>> mapList = null;
- Object values = postMapValues.get(fieldName);
- if (values instanceof List<?>) {
- mapList = (List<Map<String, Object>>) values;
- } else {
- mapList = new ArrayList<Map<String, Object>>();
- mapList.add((Map<String, Object>) values);
- }
- for (Map<String, Object> mapObject : mapList) {
- for (String mapClassCode : mapObject.keySet()) {
+ List<? extends PostMapValues> values = postMapValues.get(fieldName);
+ for (PostMapValues pmv : values) {
+ for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
+ String mapClassCode = mapFieldDescriptor.getJsonName();
Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
if (mapper == null) {
- break;
+ LOG.warn("Unknown mapper type: " + mapClassCode);
+ continue;
}
- if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) {
+ if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) {
List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
if (fieldMapList == null) {
fieldMapList = new ArrayList<Mapper>();
@@ -156,15 +160,8 @@ public abstract class Filter extends ConfigBlock {
}
@Override
- public boolean isFieldConditionMatch(String fieldName, String stringValue) {
- if (!super.isFieldConditionMatch(fieldName, stringValue)) {
- if (input != null) {
- return input.isFieldConditionMatch(fieldName, stringValue);
- } else {
- return false;
- }
- }
- return true;
+ public boolean isEnabled() {
+ return BooleanUtils.isNotFalse(filterDescriptor.isEnabled());
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7e2da70..70aea65 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -38,6 +38,8 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -75,11 +77,10 @@ public class FilterGrok extends Filter {
super.init();
try {
- messagePattern = escapePattern(getStringValue("message_pattern"));
- multilinePattern = escapePattern(getStringValue("multiline_pattern"));
- sourceField = getStringValue("source_field");
- removeSourceField = getBooleanValue("remove_source_field",
- removeSourceField);
+ messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
+ multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
+ sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField();
+ removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
getShortDescription());
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 35f692e..cfccdeb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -25,12 +25,9 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
public class FilterJSON extends Filter {
- private static final Logger LOG = Logger.getLogger(FilterJSON.class);
-
@Override
public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
Map<String, Object> jsonMap = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index b04a439..f2a4186 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -28,13 +28,11 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
public class FilterKeyValue extends Filter {
- private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
-
private String sourceField = null;
private String valueSplit = "=";
private String fieldSplit = "\t";
@@ -46,10 +44,10 @@ public class FilterKeyValue extends Filter {
public void init() throws Exception {
super.init();
- sourceField = getStringValue("source_field");
- valueSplit = getStringValue("value_split", valueSplit);
- fieldSplit = getStringValue("field_split", fieldSplit);
- valueBorders = getStringValue("value_borders");
+ sourceField = filterDescriptor.getSourceField();
+ valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
+ fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit);
+ valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders();
LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
fieldSplit + ", " + getShortDescription());
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 41a1fa5..cfa1903 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -29,14 +29,14 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
public abstract class AbstractInputFile extends Input {
- protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class);
-
private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
protected File[] logFiles;
@@ -73,16 +73,16 @@ public abstract class AbstractInputFile extends Input {
// Let's close the file and set it to true after we start monitoring it
setClosed(true);
- logPath = getStringValue("path");
- tail = getBooleanValue("tail", tail);
- checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
+ logPath = inputDescriptor.getPath();
+ tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail);
+ checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
if (StringUtils.isEmpty(logPath)) {
LOG.error("path is empty for file input. " + getShortDescription());
return;
}
- String startPosition = getStringValue("start_position");
+ String startPosition = inputDescriptor.getStartPosition();
if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") ||
startPosition.equalsIgnoreCase("begining") || !tail) {
isStartFromBegining = true;
@@ -313,7 +313,7 @@ public abstract class AbstractInputFile extends Input {
@Override
public String getShortDescription() {
- return "input:source=" + getStringValue("source") + ", path=" +
+ return "input:source=" + inputDescriptor.getSource() + ", path=" +
(!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 9f54d8a..fba596d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -21,23 +21,25 @@ package org.apache.ambari.logfeeder.input;
import java.io.File;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ambari.logfeeder.input.cache.LRUCache;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.ConfigItem;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-public abstract class Input extends ConfigBlock implements Runnable {
- private static final Logger LOG = Logger.getLogger(Input.class);
-
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Priority;
+
+public abstract class Input extends ConfigItem implements Runnable {
private static final boolean DEFAULT_TAIL = true;
private static final boolean DEFAULT_USE_EVENT_MD5 = false;
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
@@ -47,12 +49,8 @@ public abstract class Input extends ConfigBlock implements Runnable {
private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
- private static final String CACHE_ENABLED = "cache_enabled";
- private static final String CACHE_KEY_FIELD = "cache_key_field";
- private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
- private static final String CACHE_SIZE = "cache_size";
- private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
-
+ protected InputDescriptor inputDescriptor;
+
protected InputManager inputManager;
protected OutputManager outputManager;
private List<Output> outputList = new ArrayList<Output>();
@@ -75,21 +73,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
return null;
}
- @Override
- public void loadConfig(Map<String, Object> map) {
- super.loadConfig(map);
- String typeValue = getStringValue("type");
- if (typeValue != null) {
- // Explicitly add type and value to field list
- contextFields.put("type", typeValue);
- @SuppressWarnings("unchecked")
- Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
- if (addFields == null) {
- addFields = new HashMap<String, Object>();
- map.put("add_fields", addFields);
- }
- addFields.put("type", typeValue);
- }
+ public void loadConfig(InputDescriptor inputDescriptor) {
+ this.inputDescriptor = inputDescriptor;
+ }
+
+ public InputDescriptor getInputDescriptor() {
+ return inputDescriptor;
}
public void setType(String type) {
@@ -104,6 +93,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
this.outputManager = outputManager;
}
+ public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
+ Conditions conditions = filterDescriptor.getConditions();
+ Fields fields = conditions.getFields();
+ return fields.getType().contains(inputDescriptor.getType());
+ }
+
public void addFilter(Filter filter) {
if (firstFilter == null) {
firstFilter = filter;
@@ -116,6 +111,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
}
+ @SuppressWarnings("unchecked")
+ public boolean isOutputRequired(Output output) {
+ Map<String, Object> conditions = (Map<String, Object>) output.getConfigs().get("conditions");
+ if (conditions == null) {
+ return false;
+ }
+
+ Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
+ if (fields == null) {
+ return false;
+ }
+
+ List<String> types = (List<String>) fields.get("rowtype");
+ return types.contains(inputDescriptor.getRowtype());
+ }
+
public void addOutput(Output output) {
outputList.add(output);
}
@@ -124,9 +135,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
public void init() throws Exception {
super.init();
initCache();
- tail = getBooleanValue("tail", DEFAULT_TAIL);
- useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
- genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
+ tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
+ useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
+ genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
if (firstFilter != null) {
firstFilter.init();
@@ -236,26 +247,26 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
private void initCache() {
- boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
- ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
+ boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
+ ? inputDescriptor.isCacheEnabled()
: LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
if (cacheEnabled) {
- String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
- ? getStringValue(CACHE_KEY_FIELD)
+ String cacheKeyField = inputDescriptor.getCacheKeyField() != null
+ ? inputDescriptor.getCacheKeyField()
: LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
- setCacheKeyField(getStringValue(cacheKeyField));
+ setCacheKeyField(cacheKeyField);
- boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
- ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
+ boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
+ ? inputDescriptor.getCacheLastDedupEnabled()
: LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
- int cacheSize = getConfigValue(CACHE_SIZE) != null
- ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
+ int cacheSize = inputDescriptor.getCacheSize() != null
+ ? inputDescriptor.getCacheSize()
: LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
- long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
- ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
+ long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
+ ? inputDescriptor.getCacheDedupInterval()
: Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
@@ -319,6 +330,11 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
@Override
+ public boolean isEnabled() {
+ return BooleanUtils.isNotFalse(inputDescriptor.isEnabled());
+ }
+
+ @Override
public String getNameForThread() {
if (filePath != null) {
try {
@@ -331,7 +347,17 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
@Override
+ public boolean logConfigs(Priority level) {
+ if (!super.logConfigs(level)) {
+ return false;
+ }
+ LOG.log(level, "Printing Input=" + getShortDescription());
+ LOG.log(level, "description=" + inputDescriptor.getPath());
+ return true;
+ }
+
+ @Override
public String toString() {
return getShortDescription();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 3737839..fc40ca4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -25,7 +25,9 @@ import java.io.FileNotFoundException;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.solr.common.util.Base64;
@@ -62,7 +64,7 @@ public class InputFile extends AbstractInputFile {
@Override
void start() throws Exception {
- boolean isProcessFile = getBooleanValue("process_file", true);
+ boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
if (isProcessFile) {
if (tail) {
processFile(logFiles[0]);
@@ -100,7 +102,7 @@ public class InputFile extends AbstractInputFile {
}
private void copyFiles(File[] files) {
- boolean isCopyFile = getBooleanValue("copy_file", false);
+ boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false);
if (isCopyFile && files != null) {
for (File file : files) {
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index f560379..4bf162b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import org.apache.ambari.logfeeder.util.S3Util;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
import org.apache.commons.lang.ArrayUtils;
import org.apache.solr.common.util.Base64;
@@ -78,8 +79,8 @@ public class InputS3File extends AbstractInputFile {
@Override
protected BufferedReader openLogFile(File logPathFile) throws IOException {
- String s3AccessKey = getStringValue("s3_access_key");
- String s3SecretKey = getStringValue("s3_secret_key");
+ String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey();
+ String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey();
BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
fileKey = getFileKey(logPathFile);
base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index d193cdb..5e7bdb3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -35,25 +34,23 @@ import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.filter.FilterJSON;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
import org.apache.commons.collections.MapUtils;
-import org.apache.log4j.Logger;
import org.apache.solr.common.util.Base64;
import com.google.common.base.Joiner;
public class InputSimulate extends Input {
- private static final Logger LOG = Logger.getLogger(InputSimulate.class);
-
private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
private static final Map<String, String> typeToFilePath = new HashMap<>();
- private static List<String> inputTypes = new ArrayList<>();
- public static void loadTypeToFilePath(List<Map<String, Object>> inputList) {
- for (Map<String, Object> input : inputList) {
- if (input.containsKey("type") && input.containsKey("path")) {
- typeToFilePath.put((String)input.get("type"), (String)input.get("path"));
- inputTypes.add((String)input.get("type"));
- }
+ private static final List<String> inputTypes = new ArrayList<>();
+ public static void loadTypeToFilePath(List<InputDescriptor> inputList) {
+ for (InputDescriptor input : inputList) {
+ typeToFilePath.put(input.getType(), input.getPath());
+ inputTypes.add(input.getType());
}
}
@@ -86,7 +83,7 @@ public class InputSimulate extends Input {
this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName;
Filter filter = new FilterJSON();
- filter.loadConfig(Collections.<String, Object> emptyMap());
+ filter.loadConfig(new FilterJsonDescriptorImpl());
filter.setInput(this);
addFilter(filter);
}
@@ -141,7 +138,7 @@ public class InputSimulate extends Input {
String type = types.get(typePos);
String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type);
- configs.put("type", type);
+ ((InputDescriptorImpl)inputDescriptor).setType(type);
setFilePath(filePath);
return type;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
index 1f635af..6173f53 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/FilterLogData.java
@@ -44,7 +44,7 @@ public enum FilterLogData {
}
public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
- if ("audit".equals(inputMarker.input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
+ if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype()))
return true;
boolean isAllowed = applyFilter(jsonObj);
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 96709c0..5facf76 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+
public abstract class Mapper {
private String inputDesc;
protected String fieldName;
private String mapClassCode;
- public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs);
+ public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
protected void init(String inputDesc, String fieldName, String mapClassCode) {
this.inputDesc = inputDesc;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 6a7fad7..5d34c06 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -26,6 +26,8 @@ import java.util.Map;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
@@ -39,18 +41,11 @@ public class MapperDate extends Mapper {
private SimpleDateFormat srcDateFormatter=null;
@Override
- public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
init(inputDesc, fieldName, mapClassCode);
- if (!(mapConfigs instanceof Map)) {
- LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() +
- ", map=" + this);
- return false;
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
- String targetDateFormat = (String) mapObjects.get("target_date_pattern");
- String srcDateFormat = (String) mapObjects.get("src_date_pattern");
+ String targetDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getTargetDatePattern();
+ String srcDateFormat = ((MapDateDescriptor)mapFieldDescriptor).getSourceDatePattern();
if (StringUtils.isEmpty(targetDateFormat)) {
LOG.fatal("Date format for map is empty. " + this);
} else {
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
index 39e1ff4..a463f49 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
@@ -21,6 +21,8 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
@@ -33,16 +35,9 @@ public class MapperFieldCopy extends Mapper {
private String copyName = null;
@Override
- public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
init(inputDesc, fieldName, mapClassCode);
- if (!(mapConfigs instanceof Map)) {
- LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
- return false;
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
- copyName = (String) mapObjects.get("copy_name");
+ copyName = ((MapFieldCopyDescriptor)mapFieldDescriptor).getCopyName();
if (StringUtils.isEmpty(copyName)) {
LOG.fatal("Map copy name is empty.");
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 9b6e83c..3f160da 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -35,16 +37,10 @@ public class MapperFieldName extends Mapper {
private String newValue = null;
@Override
- public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
init(inputDesc, fieldName, mapClassCode);
- if (!(mapConfigs instanceof Map)) {
- LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
- return false;
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
- newValue = (String) mapObjects.get("new_fieldname");
+
+ newValue = ((MapFieldNameDescriptor)mapFieldDescriptor).getNewFieldName();
if (StringUtils.isEmpty(newValue)) {
LOG.fatal("Map field value is empty.");
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 87cda65..03ff95b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -22,6 +22,8 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -36,17 +38,11 @@ public class MapperFieldValue extends Mapper {
private String newValue = null;
@Override
- public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
init(inputDesc, fieldName, mapClassCode);
- if (!(mapConfigs instanceof Map)) {
- LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
- return false;
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
- prevValue = (String) mapObjects.get("pre_value");
- newValue = (String) mapObjects.get("post_value");
+ prevValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPreValue();
+ newValue = ((MapFieldValueDescriptor)mapFieldDescriptor).getPostValue();;
if (StringUtils.isEmpty(newValue)) {
LOG.fatal("Map field value is empty.");
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index bc6a553..65b9e19 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -28,11 +28,8 @@ import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
public abstract class Output extends ConfigBlock {
- private static final Logger LOG = Logger.getLogger(Output.class);
-
private String destination = null;
protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
http://git-wip-us.apache.org/repos/asf/ambari/blob/fd4a7a46/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
index fcf2695..8308a4f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -41,7 +41,7 @@ public class OutputLineFilter {
public Boolean apply(Map<String, Object> lineMap, Input input) {
boolean isLogFilteredOut = false;
LRUCache inputLruCache = input.getCache();
- if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE))) {
+ if (inputLruCache != null && "service".equals(input.getInputDescriptor().getRowtype())) {
String logMessage = (String) lineMap.get(input.getCacheKeyField());
Long timestamp = null;
if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {