You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/10 19:35:08 UTC
[3/3] ambari git commit: AMBARI-17785. Provide support for S3 as a
first class destination for log events (Hemanth Yamijala via oleewere)
AMBARI-17785. Provide support for S3 as a first class destination for log events (Hemanth Yamijala via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/26e5fe0a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/26e5fe0a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/26e5fe0a
Branch: refs/heads/branch-2.5
Commit: 26e5fe0a9d63cadc9e19be0bac17a507224d0060
Parents: 23bd337
Author: oleewere <ol...@gmail.com>
Authored: Tue Aug 9 15:08:13 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Sat Sep 10 21:27:45 2016 +0200
----------------------------------------------------------------------
.../ambari/logfeeder/input/InputMarker.java | 1 -
.../ambari/logfeeder/output/OutputS3File.java | 227 ++++++++++++-------
.../logfeeder/output/S3LogPathResolver.java | 54 +++++
.../logfeeder/output/S3OutputConfiguration.java | 114 ++++++++++
.../ambari/logfeeder/output/S3Uploader.java | 163 +++++++++++++
.../logfeeder/output/spool/LogSpooler.java | 91 +++++++-
.../org/apache/ambari/logfeeder/s3/S3Util.java | 8 +-
.../logfeeder/output/OutputS3FileTest.java | 198 ++++++++++++++++
.../logfeeder/output/S3LogPathResolverTest.java | 51 +++++
.../ambari/logfeeder/output/S3UploaderTest.java | 164 ++++++++++++++
.../logfeeder/output/spool/LogSpoolerTest.java | 182 ++++++++++++---
11 files changed, 1123 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
index 8def4b9..48a7f1d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -32,5 +32,4 @@ public class InputMarker {
return "InputMarker [lineNumber=" + lineNumber + ", input="
+ input.getShortDescription() + "]";
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index f42195c..cbc1045 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -18,83 +18,95 @@
*/
package org.apache.ambari.logfeeder.output;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.ambari.logfeeder.LogFeeder;
import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
import org.apache.ambari.logfeeder.s3.S3Util;
-import org.apache.ambari.logfeeder.util.CompressionUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.*;
+import java.util.Map.Entry;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
/**
- * Write log file into s3 bucket
+ * Write log file into s3 bucket.
+ *
+ * This class supports two modes of upload:
+ * <ul>
+ * <li>A one time upload of files matching a pattern</li>
+ * <li>A batch mode, asynchronous, periodic upload of files</li>
+ * </ul>
*/
-public class OutputS3File extends Output {
+public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+
+ public static final String INPUT_ATTRIBUTE_TYPE = "type";
+ public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
+ static private Logger logger = Logger.getLogger(OutputS3File.class);
+
+ private LogSpooler logSpooler;
+ private S3OutputConfiguration s3OutputConfiguration;
+ private S3Uploader s3Uploader;
+ @Override
+ public void init() throws Exception {
+ super.init();
+ s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
+ }
private static boolean uploadedGlobalConfig = false;
+ /**
+ * Copy local log files and corresponding config to S3 bucket one time.
+ * @param inputFile The file to be copied
+ * @param inputMarker Contains information about the configuration to be uploaded.
+ */
@Override
public void copyFile(File inputFile, InputMarker inputMarker) {
- String bucketName = getStringValue("s3_bucket");
- String s3LogDir = getStringValue("s3_log_dir");
- HashMap<String, String> contextParam = buildContextParam();
- s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam);
- String s3AccessKey = getStringValue("s3_access_key");
- String s3SecretKey = getStringValue("s3_secret_key");
- String compressionAlgo = getStringValue("compression_algo");
- String fileName = inputFile.getName();
- // create tmp compressed File
- String tmpDir = LogFeederUtil.getLogfeederTempDir();
- File outputFile = new File(tmpDir + fileName + "_"
- + new Date().getTime() + "." + compressionAlgo);
- outputFile = CompressionUtil.compressFile(inputFile, outputFile,
- compressionAlgo);
- String type = inputMarker.input.getStringValue("type");
- String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type
- + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "."
- + compressionAlgo;
- S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey,
- s3SecretKey);
- // delete local compressed file
- outputFile.deleteOnExit();
- ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>();
+ String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration,
+ S3Util.INSTANCE, false, type);
+ String resolvedPath = s3Uploader.uploadFile(inputFile,
+ inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+
+ uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
+ }
+
+ private void uploadConfig(InputMarker inputMarker, String type,
+ S3OutputConfiguration s3OutputConfiguration, String resolvedPath) {
+
+ ArrayList<Map<String, Object>> filters = new ArrayList<>();
addFilters(filters, inputMarker.input.getFirstFilter());
- Map<String, Object> inputConfig = new HashMap<String, Object>();
+ Map<String, Object> inputConfig = new HashMap<>();
inputConfig.putAll(inputMarker.input.getConfigs());
- String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName
- + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path;
+ String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName()
+ + S3Util.S3_PATH_SEPARATOR + resolvedPath;
inputConfig.put("path", s3CompletePath);
- ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+ ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
inputConfigList.add(inputConfig);
// set source s3_file
// remove global config from filter config
removeGlobalConfig(inputConfigList);
removeGlobalConfig(filters);
// write config into s3 file
- String s3Key = getComponentConfigFileName(type);
- Map<String, Object> config = new HashMap<String, Object>();
+ Map<String, Object> config = new HashMap<>();
config.put("filter", filters);
config.put("input", inputConfigList);
- writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam,
- s3Key);
+ writeConfigToS3(config, getComponentConfigFileName(type), s3OutputConfiguration);
// write global config
- writeGlobalConfig();
+ writeGlobalConfig(s3OutputConfiguration);
}
- public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
+ private void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
if (filter != null) {
Map<String, Object> filterConfig = new HashMap<String, Object>();
filterConfig.putAll(filter.getConfigs());
@@ -105,38 +117,28 @@ public class OutputS3File extends Output {
}
}
- public void writeConfigToS3(Map<String, Object> config, String bucketName,
- String accessKey, String secretKey, HashMap<String, String> contextParam,
- String s3Key) {
- String s3ConfigDir = getStringValue("s3_config_dir");
- s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
+ private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix,
+ S3OutputConfiguration s3OutputConfiguration) {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String configJson = gson.toJson(config);
+ String configJson = gson.toJson(configToWrite);
- s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
- S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
- secretKey);
- }
+ String s3ResolvedKey = new S3LogPathResolver().
+ getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster());
- public String getComponentConfigFileName(String componentName) {
- String fileName = "input.config-" + componentName + ".json";
- return fileName;
+ S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(),
+ s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(),
+ s3OutputConfiguration.getS3SecretKey());
}
- public HashMap<String, String> buildContextParam() {
- HashMap<String, String> contextParam = new HashMap<String, String>();
- contextParam.put("host", LogFeederUtil.hostName);
- contextParam.put("ip", LogFeederUtil.ipAddress);
- String cluster = getNVList("add_fields").get("cluster");
- contextParam.put("cluster", cluster);
- return contextParam;
+ private String getComponentConfigFileName(String componentName) {
+ return "input.config-" + componentName + ".json";
}
-
+
private Map<String, Object> getGlobalConfig() {
Map<String, Object> globalConfig = LogFeeder.globalMap;
if (globalConfig == null) {
- globalConfig = new HashMap<String, Object>();
+ globalConfig = new HashMap<>();
}
return globalConfig;
}
@@ -163,7 +165,7 @@ public class OutputS3File extends Output {
* write global config in s3 file Invoke only once
*/
@SuppressWarnings("unchecked")
- private synchronized void writeGlobalConfig() {
+ private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
if (!uploadedGlobalConfig) {
Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
//updating global config before write to s3
@@ -174,7 +176,7 @@ public class OutputS3File extends Output {
Map<String, Object> addFields = (Map<String, Object>) globalConfig
.get("add_fields");
if (addFields == null) {
- addFields = new HashMap<String, Object>();
+ addFields = new HashMap<>();
}
addFields.put("ip", LogFeederUtil.ipAddress);
addFields.put("host", LogFeederUtil.hostName);
@@ -189,20 +191,85 @@ public class OutputS3File extends Output {
globalConfig.put("add_fields", addFields);
Map<String, Object> config = new HashMap<String, Object>();
config.put("global", globalConfig);
- String s3AccessKey = getStringValue("s3_access_key");
- String s3SecretKey = getStringValue("s3_secret_key");
- String bucketName = getStringValue("s3_bucket");
- String s3Key = "global.config.json";
- HashMap<String, String> contextParam = buildContextParam();
- writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey,
- contextParam, s3Key);
+ writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
uploadedGlobalConfig = true;
}
}
+ /**
+ * Write a log line to local file, to upload to S3 bucket asynchronously.
+ *
+ * This method uses a {@link LogSpooler} to spool the log lines to a local file.
+
+ * @param block The log event to upload
+ * @param inputMarker Contains information about the log file feeding the lines.
+ * @throws Exception
+ */
@Override
public void write(String block, InputMarker inputMarker) throws Exception {
- throw new UnsupportedOperationException(
- "write method is not yet supported for output=s3_file");
+ if (logSpooler == null) {
+ logSpooler = createSpooler(inputMarker.input.getFilePath());
+ s3Uploader = createUploader(inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+ }
+ logSpooler.add(block);
+ }
+
+ @VisibleForTesting
+ protected S3Uploader createUploader(String logType) {
+ S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType);
+ uploader.startUploaderThread();
+ return uploader;
+ }
+
+ @VisibleForTesting
+ protected LogSpooler createSpooler(String filePath) {
+ String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
+ logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s",
+ spoolDirectory, filePath));
+ return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
+ s3OutputConfiguration.getRolloverTimeThresholdSecs());
+ }
+
+ /**
+ * Check whether the locally spooled file should be rolled over, based on file size.
+ *
+ * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
+ * for rollover.
+ * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
+ * false otherwise
+ */
+ @Override
+ public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
+ File spoolFile = currentSpoolerContext.getActiveSpoolFile();
+ long currentSize = spoolFile.length();
+ boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
+ if (result) {
+ logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
+ s3OutputConfiguration.getRolloverSizeThresholdBytes()));
+ }
+ return result;
+ }
+
+ /**
+ * Stops dependent objects that consume resources.
+ */
+ @Override
+ public void close() {
+ if (s3Uploader != null) {
+ s3Uploader.stopUploaderThread();
+ }
+ if (logSpooler != null) {
+ logSpooler.close();
+ }
+ }
+
+ /**
+ * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
+ *
+ * @param rolloverFile The file that has been rolled over.
+ */
+ @Override
+ public void handleRollover(File rolloverFile) {
+ s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
new file mode 100644
index 0000000..1bbf33e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+
+import java.util.HashMap;
+
+/**
+ * A utility class that resolves variables like hostname, IP address and cluster name in S3 paths.
+ */
+public class S3LogPathResolver {
+
+ /**
+ * Construct a full S3 path by resolving variables in the path name including hostname, IP address
+ * and cluster name
+ * @param baseKeyPrefix The prefix which can contain the variables.
+ * @param keySuffix The suffix appended to the prefix after variable expansion
+ * @param cluster The name of the cluster
+ * @return full S3 path.
+ */
+ public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
+ HashMap<String, String> contextParam = buildContextParam(cluster);
+ String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
+ return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix;
+ }
+
+ private HashMap<String, String> buildContextParam(String cluster) {
+ HashMap<String, String> contextParam = new HashMap<>();
+ contextParam.put("host", LogFeederUtil.hostName);
+ contextParam.put("ip", LogFeederUtil.ipAddress);
+ contextParam.put("cluster", cluster);
+ return contextParam;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
new file mode 100644
index 0000000..fb597d3
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.ConfigBlock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Holds all configuration relevant for S3 upload.
+ */
+public class S3OutputConfiguration {
+
+ public static final String SPOOL_DIR_KEY = "spool_dir";
+ public static final String ROLLOVER_SIZE_THRESHOLD_BYTES_KEY = "rollover_size_threshold_bytes";
+ public static final Long DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024L;
+ public static final String ROLLOVER_TIME_THRESHOLD_SECS_KEY = "rollover_time_threshold_secs";
+ public static final Long DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS = 3600L;
+ public static final String S3_BUCKET_NAME_KEY = "s3_bucket";
+ public static final String S3_LOG_DIR_KEY = "s3_log_dir";
+ public static final String S3_ACCESS_KEY = "s3_access_key";
+ public static final String S3_SECRET_KEY = "s3_secret_key";
+ public static final String COMPRESSION_ALGO_KEY = "compression_algo";
+ public static final String ADDITIONAL_FIELDS_KEY = "add_fields";
+ public static final String CLUSTER_KEY = "cluster";
+
+ private Map<String, Object> configs;
+
+ S3OutputConfiguration(Map<String, Object> configs) {
+ this.configs = configs;
+ }
+
+ public String getS3BucketName() {
+ return (String) configs.get(S3_BUCKET_NAME_KEY);
+ }
+
+ public String getS3Path() {
+ return (String) configs.get(S3_LOG_DIR_KEY);
+ }
+
+ public String getS3AccessKey() {
+ return (String) configs.get(S3_ACCESS_KEY);
+ }
+
+ public String getS3SecretKey() {
+ return (String) configs.get(S3_SECRET_KEY);
+ }
+
+ public String getCompressionAlgo() {
+ return (String) configs.get(COMPRESSION_ALGO_KEY);
+ }
+
+ public Long getRolloverSizeThresholdBytes() {
+ return (Long) configs.get(ROLLOVER_SIZE_THRESHOLD_BYTES_KEY);
+ }
+
+ public Long getRolloverTimeThresholdSecs() {
+ return (Long) configs.get(ROLLOVER_TIME_THRESHOLD_SECS_KEY);
+ }
+
+ @SuppressWarnings("unchecked")
+ public String getCluster() {
+ return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
+ }
+
+ public static S3OutputConfiguration fromConfigBlock(ConfigBlock configBlock) {
+ Map<String, Object> configs = new HashMap<>();
+ String[] stringValuedKeysToCopy = new String[] {
+ SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
+ S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY
+ };
+
+ for (String key : stringValuedKeysToCopy) {
+ String value = configBlock.getStringValue(key);
+ if (value != null) {
+ configs.put(key, value);
+ }
+ }
+
+ String[] longValuedKeysToCopy = new String[] {
+ ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, ROLLOVER_TIME_THRESHOLD_SECS_KEY
+ };
+
+ Long[] defaultValuesForLongValuedKeys = new Long[] {
+ DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES, DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS
+ };
+
+ for (int i = 0; i < longValuedKeysToCopy.length; i++) {
+ configs.put(longValuedKeysToCopy[i],
+ configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+ }
+
+ configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));
+
+ return new S3OutputConfiguration(configs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
new file mode 100644
index 0000000..dec685f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.Date;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A class that handles the uploading of files to S3.
+ *
+ * This class can be used to upload a file one time, or start a daemon thread that can
+ * be used to upload files added to a queue one after the other. When used to upload
+ * files via a queue, one instance of this class is created for each file handled in
+ * {@link org.apache.ambari.logfeeder.input.InputFile}.
+ */
+public class S3Uploader implements Runnable {
+ public static final String POISON_PILL = "POISON-PILL";
+ private static Logger logger = Logger.getLogger(S3Uploader.class);
+
+ private final S3OutputConfiguration s3OutputConfiguration;
+ private final S3Util s3UtilInstance;
+ private final boolean deleteOnEnd;
+ private String logType;
+ private final BlockingQueue<String> fileContextsToUpload;
+ private AtomicBoolean stopRunningThread = new AtomicBoolean(false);
+
+ public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd,
+ String logType) {
+ this.s3OutputConfiguration = s3OutputConfiguration;
+ this.s3UtilInstance = s3UtilInstance;
+ this.deleteOnEnd = deleteOnEnd;
+ this.logType = logType;
+ this.fileContextsToUpload = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Starts a thread that can be used to upload files from a queue.
+ *
+ * Add files to be uploaded using the method {@link #addFileForUpload(String)}.
+ * If this thread is started, it must be stopped using the method {@link #stopUploaderThread()}.
+ */
+ void startUploaderThread() {
+ Thread s3UploaderThread = new Thread(this, "s3-uploader-thread-"+logType);
+ s3UploaderThread.setDaemon(true);
+ s3UploaderThread.start();
+ }
+
+ /**
+ * Stops the thread used to upload files from a queue.
+ *
+ * This method must be called to cleanly free up resources, typically on shutdown of the process.
+ * Note that this method does not drain any remaining files, and instead stops the thread
+ * as soon as any file being currently uploaded is complete.
+ */
+ void stopUploaderThread() {
+ stopRunningThread.set(true);
+ boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
+ if (!offerStatus) {
+ logger.warn("Could not add poison pill to interrupt uploader thread.");
+ }
+ }
+
+ /**
+ * Add a file to a queue to upload asynchronously.
+ * @param fileToUpload Full path to the local file which must be uploaded.
+ */
+ void addFileForUpload(String fileToUpload) {
+ boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
+ if (!offerStatus) {
+ logger.error("Could not add file " + fileToUpload + " for upload.");
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!stopRunningThread.get()) {
+ try {
+ String fileNameToUpload = fileContextsToUpload.take();
+ if (POISON_PILL.equals(fileNameToUpload)) {
+ logger.warn("Found poison pill while waiting for files to upload, exiting");
+ return;
+ }
+ uploadFile(new File(fileNameToUpload), logType);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Upload the given file to S3.
+ *
+ * The file which should be available locally, is first compressed using the compression
+ * method specified by {@link S3OutputConfiguration#getCompressionAlgo()}. This compressed
+ * file is what is uploaded to S3.
+ * @param fileToUpload the file to upload
+ * @param logType the name of the log which is used in the S3 path constructed.
+ * @return
+ */
+ String uploadFile(File fileToUpload, String logType) {
+ String bucketName = s3OutputConfiguration.getS3BucketName();
+ String s3AccessKey = s3OutputConfiguration.getS3AccessKey();
+ String s3SecretKey = s3OutputConfiguration.getS3SecretKey();
+ String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
+
+ String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
+ String s3Path = new S3LogPathResolver().
+ getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType,
+ keySuffix, s3OutputConfiguration.getCluster());
+ logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s",
+ s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
+ File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
+
+ logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
+ s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey,
+ s3SecretKey);
+
+ // delete local compressed file
+ sourceFile.delete();
+ if (deleteOnEnd) {
+ logger.info("Deleting input file as required");
+ if (!fileToUpload.delete()) {
+ logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
+ }
+ }
+ return s3Path;
+ }
+
+ @VisibleForTesting
+ protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+ File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_"
+ + new Date().getTime() + "." + compressionAlgo);
+ outputFile = CompressionUtil.compressFile(fileToUpload, outputFile,
+ compressionAlgo);
+ return outputFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index 306326a..fb263ba 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -25,6 +25,9 @@ import org.apache.log4j.Logger;
import java.io.*;
import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A class that manages local storage of log events before they are uploaded to the output destinations.
@@ -36,6 +39,7 @@ import java.util.Date;
* {@link RolloverHandler} to trigger the handling of the rolled over file.
*/
public class LogSpooler {
+ public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
static private Logger logger = Logger.getLogger(LogSpooler.class);
static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
@@ -46,6 +50,8 @@ public class LogSpooler {
private PrintWriter currentSpoolBufferedWriter;
private File currentSpoolFile;
private LogSpoolerContext currentSpoolerContext;
+ private Timer rolloverTimer;
+ private AtomicBoolean rolloverInProgress = new AtomicBoolean(false);
/**
* Create an instance of the LogSpooler.
@@ -59,11 +65,34 @@ public class LogSpooler {
*/
public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
RolloverHandler rolloverHandler) {
+ this(spoolDirectory, sourceFileNamePrefix, rolloverCondition, rolloverHandler,
+ TIME_BASED_ROLLOVER_DISABLED_THRESHOLD);
+ }
+
+ /**
+ * Create an instance of the LogSpooler.
+ * @param spoolDirectory The directory under which spooler files are created.
+ * Should be unique per instance of {@link Output}
+ * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
+ * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
+ * determine when to rollover.
+ * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
+ * there should be a rollover.
+ * @param rolloverTimeThresholdSecs Setting a non-zero value enables time based rollover of
+ * spool files. Sending a 0 value disables this functionality.
+ */
+ public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
+ RolloverHandler rolloverHandler, long rolloverTimeThresholdSecs) {
this.spoolDirectory = spoolDirectory;
this.sourceFileNamePrefix = sourceFileNamePrefix;
this.rolloverCondition = rolloverCondition;
this.rolloverHandler = rolloverHandler;
- initializeSpoolFile();
+ if (rolloverTimeThresholdSecs != TIME_BASED_ROLLOVER_DISABLED_THRESHOLD) {
+ rolloverTimer = new Timer("log-spooler-timer-" + sourceFileNamePrefix, true);
+ rolloverTimer.scheduleAtFixedRate(new LogSpoolerRolloverTimerTask(),
+ rolloverTimeThresholdSecs*1000, rolloverTimeThresholdSecs*1000);
+ }
+ initializeSpoolState();
}
private void initializeSpoolDirectory() {
@@ -77,9 +106,9 @@ public class LogSpooler {
}
}
- private void initializeSpoolFile() {
+ private void initializeSpoolState() {
initializeSpoolDirectory();
- currentSpoolFile = new File(spoolDirectory, getCurrentFileName());
+ currentSpoolFile = initializeSpoolFile();
try {
currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
} catch (IOException e) {
@@ -87,7 +116,12 @@ public class LogSpooler {
+ ", error message: " + e.getLocalizedMessage(), e);
}
currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
- logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath());
+ logger.info("Initialized spool file at path: " + currentSpoolFile);
+ }
+
+ @VisibleForTesting
+ protected File initializeSpoolFile() {
+ return new File(spoolDirectory, getCurrentFileName());
}
@VisibleForTesting
@@ -103,11 +137,12 @@ public class LogSpooler {
* it is ready to rollover the file.
* @param logEvent The log event to spool.
*/
- public void add(String logEvent) {
+ public synchronized void add(String logEvent) {
currentSpoolBufferedWriter.println(logEvent);
currentSpoolerContext.logEventSpooled();
if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
- rollover();
+ logger.info("Trying to rollover based on rollover condition");
+ tryRollover();
}
}
@@ -121,17 +156,49 @@ public class LogSpooler {
public void rollover() {
logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
currentSpoolBufferedWriter.flush();
- currentSpoolBufferedWriter.close();
- rolloverHandler.handleRollover(currentSpoolFile);
- logger.info("Invoked rollover handler with file: " + currentSpoolFile);
- initializeSpoolFile();
+ if (currentSpoolFile.length()==0) {
+ logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
+ } else {
+ currentSpoolBufferedWriter.close();
+ rolloverHandler.handleRollover(currentSpoolFile);
+ logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+ initializeSpoolState();
+ }
+ boolean status = rolloverInProgress.compareAndSet(true, false);
+ if (!status) {
+ logger.error("Should have reset rollover flag!!");
+ }
}
- @VisibleForTesting
- protected String getCurrentFileName() {
+ private synchronized void tryRollover() {
+ if (rolloverInProgress.compareAndSet(false, true)) {
+ rollover();
+ } else {
+ logger.warn("Ignoring rollover call as rollover already in progress for file " +
+ currentSpoolFile);
+ }
+ }
+
+ private String getCurrentFileName() {
Date currentDate = new Date();
String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
return sourceFileNamePrefix + dateStr;
}
+ /**
+ * Cancel's any time based rollover task, if started.
+ */
+ public void close() {
+ if (rolloverTimer != null) {
+ rolloverTimer.cancel();
+ }
+ }
+
+ private class LogSpoolerRolloverTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ logger.info("Trying rollover based on time");
+ tryRollover();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
index ced2b5c..db187be 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
@@ -44,13 +44,13 @@ import com.amazonaws.services.s3.transfer.Upload;
/**
* Utility to connect to s3
*/
-public enum S3Util {
- INSTANCE;
+public class S3Util {
+ public static final S3Util INSTANCE = new S3Util();
private static final Logger LOG = Logger.getLogger(S3Util.class);
- public final String S3_PATH_START_WITH = "s3://";
- public final String S3_PATH_SEPARATOR = "/";
+ public static final String S3_PATH_START_WITH = "s3://";
+ public static final String S3_PATH_SEPARATOR = "/";
public AmazonS3 getS3Client(String accessKey, String secretKey) {
AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
new file mode 100644
index 0000000..20a4f1f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class OutputS3FileTest {
+
+ private Map<String, Object> configMap;
+
+ @Before
+ public void setupConfiguration() {
+ configMap = new HashMap<>();
+ String[] configKeys = new String[] {
+ S3OutputConfiguration.SPOOL_DIR_KEY,
+ S3OutputConfiguration.S3_BUCKET_NAME_KEY,
+ S3OutputConfiguration.S3_LOG_DIR_KEY,
+ S3OutputConfiguration.S3_ACCESS_KEY,
+ S3OutputConfiguration.S3_SECRET_KEY,
+ S3OutputConfiguration.COMPRESSION_ALGO_KEY,
+ S3OutputConfiguration.ADDITIONAL_FIELDS_KEY
+ };
+ Map<String, String> additionalKeys = new HashMap<>();
+ additionalKeys.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
+ Object[] configValues = new Object[] {
+ "/var/ambari-logsearch/logfeeder",
+ "s3_bucket_name",
+ "logs",
+ "ABCDEFGHIJ1234",
+ "amdfbldkfdlf",
+ "gz",
+ additionalKeys
+ };
+ for (int i = 0; i < configKeys.length; i++) {
+ configMap.put(configKeys[i], configValues[i]);
+ }
+ }
+
+ @Test
+ public void shouldSpoolLogEventToNewSpooler() throws Exception {
+
+ InputMarker inputMarker = mock(InputMarker.class);
+ Input input = mock(Input.class);
+ inputMarker.input = input;
+ expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+ expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+ final LogSpooler spooler = mock(LogSpooler.class);
+ spooler.add("log event block");
+ final S3Uploader s3Uploader = mock(S3Uploader.class);
+ replay(input, inputMarker, spooler, s3Uploader);
+
+ OutputS3File outputS3File = new OutputS3File() {
+ @Override
+ protected LogSpooler createSpooler(String filePath) {
+ return spooler;
+ }
+
+ @Override
+ protected S3Uploader createUploader(String logType) {
+ return s3Uploader;
+ }
+ };
+ outputS3File.loadConfig(configMap);
+ outputS3File.init();
+ outputS3File.write("log event block", inputMarker);
+ verify(spooler);
+ }
+
+ @Test
+ public void shouldReuseSpoolerForSamePath() throws Exception {
+ InputMarker inputMarker = mock(InputMarker.class);
+ Input input = mock(Input.class);
+ inputMarker.input = input;
+ expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+ expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+ final LogSpooler spooler = mock(LogSpooler.class);
+ spooler.add("log event block1");
+ spooler.add("log event block2");
+ final S3Uploader s3Uploader = mock(S3Uploader.class);
+ replay(input, inputMarker, spooler, s3Uploader);
+
+ OutputS3File outputS3File = new OutputS3File() {
+ private boolean firstCallComplete;
+ @Override
+ protected LogSpooler createSpooler(String filePath) {
+ if (!firstCallComplete) {
+ firstCallComplete = true;
+ return spooler;
+ }
+ throw new IllegalStateException("Shouldn't call createSpooler for same path.");
+ }
+
+ @Override
+ protected S3Uploader createUploader(String logType) {
+ return s3Uploader;
+ }
+ };
+ outputS3File.loadConfig(configMap);
+ outputS3File.init();
+ outputS3File.write("log event block1", inputMarker);
+ outputS3File.write("log event block2", inputMarker);
+ verify(spooler);
+ }
+
+ @Test
+ public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
+
+ String thresholdSize = Long.toString(15 * 1024 * 1024L);
+ LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
+ File activeSpoolFile = mock(File.class);
+ expect(activeSpoolFile.length()).andReturn(20*1024*1024L);
+ expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
+ replay(logSpoolerContext, activeSpoolFile);
+
+ OutputS3File outputS3File = new OutputS3File();
+ configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
+ outputS3File.loadConfig(configMap);
+ outputS3File.init();
+
+ assertTrue(outputS3File.shouldRollover(logSpoolerContext));
+ }
+
+ @Test
+ public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception {
+ String thresholdSize = Long.toString(15 * 1024 * 1024L);
+ LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
+ File activeSpoolFile = mock(File.class);
+ expect(activeSpoolFile.length()).andReturn(10*1024*1024L);
+ expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
+ replay(logSpoolerContext, activeSpoolFile);
+
+ OutputS3File outputS3File = new OutputS3File();
+ configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
+ outputS3File.loadConfig(configMap);
+ outputS3File.init();
+
+ assertFalse(outputS3File.shouldRollover(logSpoolerContext));
+ }
+
+ @Test
+ public void shouldUploadFileOnRollover() throws Exception {
+ InputMarker inputMarker = mock(InputMarker.class);
+ Input input = mock(Input.class);
+ inputMarker.input = input;
+ expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+ expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+ final LogSpooler spooler = mock(LogSpooler.class);
+ spooler.add("log event block1");
+ final S3Uploader s3Uploader = mock(S3Uploader.class);
+ s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz");
+ replay(input, inputMarker, spooler, s3Uploader);
+
+ OutputS3File outputS3File = new OutputS3File() {
+ @Override
+ protected LogSpooler createSpooler(String filePath) {
+ return spooler;
+ }
+ @Override
+ protected S3Uploader createUploader(String logType) {
+ return s3Uploader;
+ }
+ };
+ outputS3File.write("log event block1", inputMarker);
+ outputS3File.handleRollover(new File("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz"));
+
+ verify(s3Uploader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
new file mode 100644
index 0000000..49cee56
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class S3LogPathResolverTest {
+
+ @Test
+ public void shouldResolveHostName() {
+ String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$host", "filename.log", "cl1");
+ assertEquals("my_s3_path/" + LogFeederUtil.hostName + "/filename.log", resolvedPath);
+ }
+
+ @Test
+ public void shouldResolveIpAddress() {
+ String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$ip", "filename.log", "cl1");
+ assertEquals("my_s3_path/" + LogFeederUtil.ipAddress + "/filename.log", resolvedPath);
+ }
+
+ @Test
+ public void shouldResolveCluster() {
+ String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster", "filename.log", "cl1");
+ assertEquals("my_s3_path/cl1/filename.log", resolvedPath);
+ }
+
+ @Test
+ public void shouldResolveCombinations() {
+ String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster/$host", "filename.log", "cl1");
+ assertEquals("my_s3_path/cl1/"+ LogFeederUtil.hostName + "/filename.log", resolvedPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
new file mode 100644
index 0000000..a0c398e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class S3UploaderTest {
+
+ public static final String TEST_BUCKET = "test_bucket";
+ public static final String TEST_PATH = "test_path";
+ public static final String GZ = "gz";
+ public static final String LOG_TYPE = "hdfs_namenode";
+ public static final String ACCESS_KEY_VALUE = "accessKeyValue";
+ public static final String SECRET_KEY_VALUE = "secretKeyValue";
+
+ @Test
+ public void shouldUploadToS3ToRightBucket() {
+ File fileToUpload = mock(File.class);
+ String fileName = "hdfs_namenode.log.123343493473948";
+ expect(fileToUpload.getName()).andReturn(fileName);
+ final File compressedFile = mock(File.class);
+ Map<String, Object> configs = setupS3Configs();
+
+ S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+ S3Util s3Util = mock(S3Util.class);
+ String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+ s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+ expect(compressedFile.delete()).andReturn(true);
+ expect(fileToUpload.delete()).andReturn(true);
+ replay(fileToUpload, compressedFile, s3Util);
+
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+ @Override
+ protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+ return compressedFile;
+ }
+ };
+ String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+ verify(s3Util);
+ assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
+ }
+
+ @Test
+ public void shouldCleanupLocalFilesOnSuccessfulUpload() {
+ File fileToUpload = mock(File.class);
+ String fileName = "hdfs_namenode.log.123343493473948";
+ expect(fileToUpload.getName()).andReturn(fileName);
+ final File compressedFile = mock(File.class);
+ Map<String, Object> configs = setupS3Configs();
+
+ S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+ S3Util s3Util = mock(S3Util.class);
+ String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+ s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+ expect(compressedFile.delete()).andReturn(true);
+ expect(fileToUpload.delete()).andReturn(true);
+ replay(fileToUpload, compressedFile, s3Util);
+
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+ @Override
+ protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+ return compressedFile;
+ }
+ };
+ s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+ verify(fileToUpload);
+ verify(compressedFile);
+ }
+
+ @Test
+ public void shouldNotCleanupUncompressedFileIfNotRequired() {
+ File fileToUpload = mock(File.class);
+ String fileName = "hdfs_namenode.log.123343493473948";
+ expect(fileToUpload.getName()).andReturn(fileName);
+ final File compressedFile = mock(File.class);
+ Map<String, Object> configs = setupS3Configs();
+
+ S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+ S3Util s3Util = mock(S3Util.class);
+ String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+ s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+ expect(compressedFile.delete()).andReturn(true);
+ replay(fileToUpload, compressedFile, s3Util);
+
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, false, LOG_TYPE) {
+ @Override
+ protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+ return compressedFile;
+ }
+ };
+ s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+ verify(fileToUpload);
+ verify(compressedFile);
+ }
+
+ @Test
+ public void shouldExpandVariablesInPath() {
+ File fileToUpload = mock(File.class);
+ String fileName = "hdfs_namenode.log.123343493473948";
+ expect(fileToUpload.getName()).andReturn(fileName);
+ final File compressedFile = mock(File.class);
+ Map<String, Object> configs = setupS3Configs();
+ configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, "$cluster/"+TEST_PATH);
+
+
+ S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+ S3Util s3Util = mock(S3Util.class);
+ String s3Key = String.format("%s/%s/%s/%s.%s", "cl1", TEST_PATH, LOG_TYPE, fileName, GZ);
+ s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+ expect(compressedFile.delete()).andReturn(true);
+ expect(fileToUpload.delete()).andReturn(true);
+ replay(fileToUpload, compressedFile, s3Util);
+
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+ @Override
+ protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+ return compressedFile;
+ }
+ };
+ s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+ verify(s3Util);
+ }
+
+ private Map<String, Object> setupS3Configs() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(S3OutputConfiguration.S3_BUCKET_NAME_KEY, TEST_BUCKET);
+ configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, TEST_PATH);
+ configs.put(S3OutputConfiguration.S3_ACCESS_KEY, ACCESS_KEY_VALUE);
+ configs.put(S3OutputConfiguration.S3_SECRET_KEY, SECRET_KEY_VALUE);
+ configs.put(S3OutputConfiguration.COMPRESSION_ALGO_KEY, GZ);
+ Map<String, String> nameValueMap = new HashMap<>();
+ nameValueMap.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
+ configs.put(S3OutputConfiguration.ADDITIONAL_FIELDS_KEY, nameValueMap);
+ return configs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
index 7d9d78a..7a47039 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -43,7 +43,6 @@ public class LogSpoolerTest {
private String spoolDirectory;
private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
- private static final String FILE_SUFFIX = "currentFile";
@Mock
private RolloverCondition rolloverCondition;
@@ -61,13 +60,13 @@ public class LogSpoolerTest {
final PrintWriter spoolWriter = mock(PrintWriter.class);
spoolWriter.println("log event");
- final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+ final File mockFile = setupInputFileExpectations();
+ LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
andReturn(false);
- replay(spoolWriter, rolloverCondition);
+ replay(spoolWriter, rolloverCondition, mockFile);
LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
rolloverCondition, rolloverHandler) {
@@ -77,8 +76,8 @@ public class LogSpoolerTest {
}
@Override
- protected String getCurrentFileName() {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+ protected File initializeSpoolFile() {
+ return mockFile;
}
};
logSpooler.add("log event");
@@ -86,20 +85,26 @@ public class LogSpoolerTest {
verify(spoolWriter);
}
+ private File setupInputFileExpectations() {
+ final File mockFile = mock(File.class);
+ expect(mockFile.length()).andReturn(10240L);
+ return mockFile;
+ }
+
@Test
public void shouldIncrementSpooledEventsCount() {
final PrintWriter spoolWriter = mock(PrintWriter.class);
spoolWriter.println("log event");
- final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+ final File mockFile = setupInputFileExpectations();
+ LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
logSpoolerContext.logEventSpooled();
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
andReturn(false);
- replay(spoolWriter, rolloverCondition);
+ replay(spoolWriter, rolloverCondition, mockFile);
LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
rolloverCondition, rolloverHandler) {
@@ -109,8 +114,8 @@ public class LogSpoolerTest {
}
@Override
- protected String getCurrentFileName() {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+ protected File initializeSpoolFile() {
+ return mockFile;
}
};
logSpooler.add("log event");
@@ -125,14 +130,14 @@ public class LogSpoolerTest {
spoolWriter.flush();
spoolWriter.close();
- File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+ final File mockFile = setupInputFileExpectations();
+ LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
andReturn(true);
- rolloverHandler.handleRollover(spoolFile);
+ rolloverHandler.handleRollover(mockFile);
- replay(spoolWriter, rolloverCondition, rolloverHandler);
+ replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
rolloverCondition, rolloverHandler) {
@@ -143,8 +148,8 @@ public class LogSpoolerTest {
}
@Override
- protected String getCurrentFileName() {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+ protected File initializeSpoolFile() {
+ return mockFile;
}
};
logSpooler.add("log event");
@@ -161,22 +166,22 @@ public class LogSpoolerTest {
spoolWriter1.flush();
spoolWriter1.close();
- File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1");
- File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2");
+ final File mockFile1 = setupInputFileExpectations();
+ final File mockFile2 = setupInputFileExpectations();
- LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1);
+ LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
).andReturn(true);
- LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2);
+ LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
).andReturn(false);
- rolloverHandler.handleRollover(spoolFile1);
+ rolloverHandler.handleRollover(mockFile1);
- replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler);
+ replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2);
LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
rolloverCondition, rolloverHandler) {
@@ -193,11 +198,11 @@ public class LogSpoolerTest {
}
@Override
- protected String getCurrentFileName() {
+ protected File initializeSpoolFile() {
if (!wasRolledOver) {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1";
+ return mockFile1;
} else {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2";
+ return mockFile2;
}
}
};
@@ -214,14 +219,14 @@ public class LogSpoolerTest {
spoolWriter.flush();
spoolWriter.close();
- File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+ final File mockFile = setupInputFileExpectations();
+ LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
expect(rolloverCondition.shouldRollover(
cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
).andReturn(true);
- rolloverHandler.handleRollover(spoolFile);
+ rolloverHandler.handleRollover(mockFile);
- replay(spoolWriter, rolloverCondition, rolloverHandler);
+ replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
rolloverCondition, rolloverHandler) {
@@ -232,8 +237,8 @@ public class LogSpoolerTest {
}
@Override
- protected String getCurrentFileName() {
- return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+ protected File initializeSpoolFile() {
+ return mockFile;
}
};
logSpooler.add("log event");
@@ -241,10 +246,121 @@ public class LogSpoolerTest {
verify(rolloverHandler);
}
+ // Rollover twice - the second rollover should work if the "rolloverInProgress"
+ // flag is being reset correctly. Third file expectations being setup due
+ // to auto-initialization.
+ @Test
+ public void shouldResetRolloverInProgressFlag() {
+ final PrintWriter spoolWriter1 = mock(PrintWriter.class);
+ final PrintWriter spoolWriter2 = mock(PrintWriter.class);
+ final PrintWriter spoolWriter3 = mock(PrintWriter.class);
+ spoolWriter1.println("log event1");
+ spoolWriter2.println("log event2");
+ spoolWriter1.flush();
+ spoolWriter1.close();
+ spoolWriter2.flush();
+ spoolWriter2.close();
+
+ final File mockFile1 = setupInputFileExpectations();
+ final File mockFile2 = setupInputFileExpectations();
+ final File mockFile3 = setupInputFileExpectations();
+
+ LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
+ expect(rolloverCondition.shouldRollover(
+ cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+ ).andReturn(true);
+
+ LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
+ expect(rolloverCondition.shouldRollover(
+ cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+ ).andReturn(true);
+
+ rolloverHandler.handleRollover(mockFile1);
+ rolloverHandler.handleRollover(mockFile2);
+
+ replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2, mockFile3);
+
+ LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+ rolloverCondition, rolloverHandler) {
+ private int currentFileNum;
+
+ @Override
+ protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+ PrintWriter spoolWriter = null;
+ switch (currentFileNum) {
+ case 0:
+ spoolWriter = spoolWriter1;
+ break;
+ case 1:
+ spoolWriter = spoolWriter2;
+ break;
+ case 2:
+ spoolWriter = spoolWriter3;
+ break;
+ }
+ currentFileNum++;
+ return spoolWriter;
+ }
+
+ @Override
+ protected File initializeSpoolFile() {
+ switch (currentFileNum) {
+ case 0:
+ return mockFile1;
+ case 1:
+ return mockFile2;
+ case 2:
+ return mockFile3;
+ default:
+ return null;
+ }
+ }
+ };
+ logSpooler.add("log event1");
+ logSpooler.add("log event2");
+
+ verify(spoolWriter1, spoolWriter2, rolloverCondition);
+ }
+
+ @Test
+ public void shouldNotRolloverZeroLengthFiles() {
+ final PrintWriter spoolWriter = mock(PrintWriter.class);
+ spoolWriter.println("log event");
+ spoolWriter.flush();
+ spoolWriter.close();
+
+ final File mockFile = mock(File.class);
+ expect(mockFile.length()).andReturn(0L);
+
+ LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
+ expect(rolloverCondition.shouldRollover(
+ cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+ andReturn(true);
+
+ replay(spoolWriter, rolloverCondition, mockFile);
+
+ LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+ rolloverCondition, rolloverHandler) {
+
+ @Override
+ protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+ return spoolWriter;
+ }
+
+ @Override
+ protected File initializeSpoolFile() {
+ return mockFile;
+ }
+ };
+ logSpooler.add("log event");
+
+ verify(mockFile);
+ }
+
class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
@Override
public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
- return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile());
+ return o1.getActiveSpoolFile()==o2.getActiveSpoolFile() ? 0 : -1;
}
}