You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/05/15 17:12:40 UTC
metron git commit: METRON-1552: Add gzip file validation check to the
geo loader (mmiklavc via mmiklavc) closes apache/metron#1011
Repository: metron
Updated Branches:
refs/heads/master 9ce4ba5a9 -> 20eaed239
METRON-1552: Add gzip file validation check to the geo loader (mmiklavc via mmiklavc) closes apache/metron#1011
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/20eaed23
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/20eaed23
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/20eaed23
Branch: refs/heads/master
Commit: 20eaed239b2552d0823d34f571b63d941c352bc9
Parents: 9ce4ba5
Author: mmiklavc <mi...@gmail.com>
Authored: Tue May 15 11:12:07 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue May 15 11:12:07 2018 -0600
----------------------------------------------------------------------
.../common/utils/CompressionStrategies.java | 100 +++++++++++++++++++
.../common/utils/CompressionStrategy.java | 52 ++++++++++
.../common/utils/CompressionUtilsTest.java | 62 ++++++++++++
.../nonbulk/geo/GeoEnrichmentLoader.java | 70 +++++++++----
.../nonbulk/geo/GeoEnrichmentLoaderTest.java | 40 ++++++--
5 files changed, 298 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
new file mode 100644
index 0000000..f9c53c8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipException;
+
+/*
+ * Factory to provide various compression strategies.
+ */
+public enum CompressionStrategies implements CompressionStrategy {
+
+ GZIP(new CompressionStrategy() {
+ @Override
+ public void compress(File inFile, File outFile) throws IOException {
+ try (FileInputStream fis = new FileInputStream(inFile);
+ FileOutputStream fos = new FileOutputStream(outFile);
+ GZIPOutputStream gzipOS = new GZIPOutputStream(fos)) {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = fis.read(buffer)) != -1) {
+ gzipOS.write(buffer, 0, len);
+ }
+ }
+ }
+
+ @Override
+ public void decompress(File inFile, File outFile) throws IOException {
+ try (FileInputStream fis = new FileInputStream(inFile);
+ GZIPInputStream gis = new GZIPInputStream(fis);
+ FileOutputStream fos = new FileOutputStream(outFile)) {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = gis.read(buffer)) != -1) {
+ fos.write(buffer, 0, len);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean test(File gzipFile) {
+ try (FileInputStream fis = new FileInputStream(gzipFile);
+ GZIPInputStream gis = new GZIPInputStream(fis)) {
+ byte[] buffer = new byte[1024];
+ // this will throw an exception on malformed file
+ gis.read(buffer);
+ } catch (ZipException | EOFException e) {
+ return false;
+ } catch (IOException e) {
+ throw new IllegalStateException("Error occurred while attempting to validate gzip file", e);
+ }
+ return true;
+ }
+ });
+
+ private CompressionStrategy strategy;
+
+ CompressionStrategies(CompressionStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public void compress(File inFile, File outFile) throws IOException {
+ strategy.compress(inFile, outFile);
+ }
+
+ @Override
+ public void decompress(File inFile, File outFile) throws IOException {
+ strategy.decompress(inFile, outFile);
+ }
+
+ @Override
+ public boolean test(File gzipFile) {
+ return strategy.test(gzipFile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
new file mode 100644
index 0000000..6a98c95
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import java.io.File;
+import java.io.IOException;
+
+public interface CompressionStrategy {
+
+ /**
+ * Compress infile.
+ *
+ * @param inFile file to compress
+ * @param outFile destination path for compressed output
+ * @throws IOException Any IO error
+ */
+ void compress(File inFile, File outFile) throws IOException;
+
+ /**
+ * Decompress infile.
+ *
+ * @param inFile file to decompress
+ * @param outFile destination path for decompressed output
+ * @throws IOException Any IO error
+ */
+ void decompress(File inFile, File outFile) throws IOException;
+
+ /**
+ * Test if file is proper gzip format. True if valid, false otherwise.
+ *
+ * @param gzipFile file to check for gzip compression
+ * @return true if file is a gzip format, false otherwise.
+ */
+ boolean test(File gzipFile);
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
new file mode 100644
index 0000000..50d83e9
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressionUtilsTest {
+
+ private static final String SAMPLE_TEXT = "hello world";
+ private File tempDir;
+ private File textFile;
+
+ @Before
+ public void setup() throws IOException {
+ tempDir = TestUtils.createTempDir(this.getClass().getName());
+ textFile = new File(tempDir, "test-text-file.txt");
+ TestUtils.write(textFile, SAMPLE_TEXT);
+ }
+
+ @Test
+ public void compresses_Gzip() throws IOException {
+ File gzipFile = new File(tempDir, "test-gz-compression-file.gz");
+ CompressionStrategies.GZIP.compress(textFile, gzipFile);
+ assertThat(CompressionStrategies.GZIP.test(gzipFile), equalTo(true));
+ }
+
+ @Test
+ public void decompresses_Gzip() throws IOException {
+ File gzipFile = new File(tempDir, "test-gz-decompress.gz");
+ CompressionStrategies.GZIP.compress(textFile, gzipFile);
+ assertThat("gzipped file should exist", gzipFile.exists(), equalTo(true));
+ File unzippedText = new File(tempDir, "test-gz-decompressed.txt");
+ CompressionStrategies.GZIP.decompress(gzipFile, unzippedText);
+ assertThat("decompressed file should exist", unzippedText.exists(), equalTo(true));
+ String actual = TestUtils.read(unzippedText);
+ assertThat("decompressed text should match", actual, equalTo(SAMPLE_TEXT));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
index b366015..fc89b89 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.CompressionStrategies;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
@@ -42,6 +43,9 @@ import java.time.ZoneOffset;
import java.util.Map;
public class GeoEnrichmentLoader {
+
+ private static final String DEFAULT_RETRIES = "2";
+
private static abstract class OptionHandler implements Function<String, Option> {
}
@@ -70,6 +74,15 @@ public class GeoEnrichmentLoader {
o.setRequired(false);
return o;
}
+ }), RETRIES("re", new GeoEnrichmentLoader.OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "retries", true, "Number of geo db download retries, after an initial failure.");
+ o.setArgName("RETRIES");
+ o.setRequired(false);
+ return o;
+ }
}), TMP_DIR("t", new GeoEnrichmentLoader.OptionHandler() {
@Nullable
@Override
@@ -146,7 +159,14 @@ public class GeoEnrichmentLoader {
System.out.println("Retrieving GeoLite2 archive");
String url = GeoEnrichmentOptions.GEO_URL.get(cli, "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz");
String tmpDir = GeoEnrichmentOptions.TMP_DIR.get(cli, "/tmp") + "/"; // Make sure there's a file separator at the end
- File localGeoFile = downloadGeoFile(url, tmpDir);
+ int numRetries = Integer.parseInt(GeoEnrichmentOptions.RETRIES.get(cli, DEFAULT_RETRIES));
+ File localGeoFile = null;
+ try {
+ localGeoFile = downloadGeoFile(url, tmpDir, numRetries);
+ } catch (IllegalStateException ies) {
+ System.err.println("Failed to download geo db file. Aborting");
+ System.exit(5);
+ }
// Want to delete the tar in event of failure
localGeoFile.deleteOnExit();
System.out.println("GeoIP files downloaded successfully");
@@ -167,26 +187,40 @@ public class GeoEnrichmentLoader {
System.out.println("Successfully created and updated new GeoIP information");
}
- protected File downloadGeoFile(String urlStr, String tmpDir) {
+ protected File downloadGeoFile(String urlStr, String tmpDir, int numRetries) {
File localFile = null;
- try {
- URL url = new URL(urlStr);
- localFile = new File(tmpDir + new File(url.getPath()).getName());
+ int attempts = 0;
+ boolean valid = false;
+ while (!valid && attempts <= numRetries) {
+ try {
+ URL url = new URL(urlStr);
+ localFile = new File(tmpDir + new File(url.getPath()).getName());
- System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath());
- if (localFile.exists() && !localFile.delete()) {
- System.err.println("File already exists locally and can't be deleted. Please delete before continuing");
- System.exit(3);
+ System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath());
+ if (localFile.exists() && !localFile.delete()) {
+ System.err.println(
+ "File already exists locally and can't be deleted. Please delete before continuing");
+ System.exit(3);
+ }
+ FileUtils.copyURLToFile(url, localFile, 5000, 10000);
+ if (!CompressionStrategies.GZIP.test(localFile)) {
+ throw new IOException("Invalid Gzip file");
+ } else {
+ valid = true;
+ }
+ } catch (MalformedURLException e) {
+ System.err.println("Malformed URL - aborting: " + e);
+ e.printStackTrace();
+ System.exit(4);
+ } catch (IOException e) {
+ System.err.println("Warning: Unable to copy remote GeoIP database to local file, attempt " + attempts + ": " + e);
+ e.printStackTrace();
}
- FileUtils.copyURLToFile(url, localFile, 5000, 10000);
- } catch (MalformedURLException e) {
- System.err.println("Malformed URL - aborting: " + e);
- e.printStackTrace();
- System.exit(4);
- } catch (IOException e) {
- System.err.println("Unable to copy remote GeoIP database to local file: " + e);
- e.printStackTrace();
- System.exit(5);
+ attempts++;
+ }
+ if (!valid) {
+ System.err.println("Unable to copy remote GeoIP database to local file after " + attempts + " attempts");
+ throw new IllegalStateException("Unable to download geo enrichment database.");
}
return localFile;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
index 3fa0270..2babeee 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
@@ -17,19 +17,24 @@
*/
package org.apache.metron.dataloads.nonbulk.geo;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.junit.*;
+import org.apache.metron.common.utils.CompressionStrategies;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
-import java.io.File;
-
-import static org.junit.Assert.assertTrue;
-
public class GeoEnrichmentLoaderTest {
private class MockGeoEnrichmentLoader extends GeoEnrichmentLoader {
@Override
@@ -43,7 +48,7 @@ public class GeoEnrichmentLoaderTest {
private File remoteDir;
private File tmpDir;
- @Before
+ @Before
public void setup() throws Exception {
testFolder.create();
remoteDir = testFolder.newFolder("remoteDir");
@@ -76,8 +81,10 @@ public class GeoEnrichmentLoaderTest {
@Test
public void testLoadGeoIpDatabase() throws Exception {
- File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
- dbFile.createNewFile();
+ File dbPlainTextFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
+ TestUtils.write(dbPlainTextFile, "hello world");
+ File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb.gz");
+ CompressionStrategies.GZIP.compress(dbPlainTextFile, dbFile);
String[] argv = {"--geo_url", "file://" + dbFile.getAbsolutePath(), "--remote_dir", remoteDir.getAbsolutePath(), "--tmp_dir", tmpDir.getAbsolutePath(), "--zk_quorum", "test:2181"};
String[] otherArgs = new GenericOptionsParser(argv).getRemainingArgs();
CommandLine cli = GeoEnrichmentLoader.GeoEnrichmentOptions.parse(new PosixParser(), otherArgs);
@@ -88,4 +95,21 @@ public class GeoEnrichmentLoaderTest {
FileSystem fs = FileSystem.get(config);
assertTrue(fs.exists(new Path(remoteDir + "/" + dbFile.getName())));
}
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void loader_throws_exception_on_bad_gzip_file() throws Exception {
+ File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
+ dbFile.createNewFile();
+
+ String geoUrl = "file://" + dbFile.getAbsolutePath();
+ int numRetries = 2;
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("Unable to download geo enrichment database.");
+ GeoEnrichmentLoader loader = new MockGeoEnrichmentLoader();
+ loader.downloadGeoFile(geoUrl, tmpDir.getAbsolutePath(), numRetries);
+ }
+
}