You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by le...@apache.org on 2018/05/18 18:29:30 UTC

[09/13] metron git commit: METRON-1552: Add gzip file validation check to the geo loader (mmiklavc via mmiklavc) closes apache/metron#1011

METRON-1552: Add gzip file validation check to the geo loader (mmiklavc via mmiklavc) closes apache/metron#1011


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/20eaed23
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/20eaed23
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/20eaed23

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 20eaed239b2552d0823d34f571b63d941c352bc9
Parents: 9ce4ba5
Author: mmiklavc <mi...@gmail.com>
Authored: Tue May 15 11:12:07 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue May 15 11:12:07 2018 -0600

----------------------------------------------------------------------
 .../common/utils/CompressionStrategies.java     | 100 +++++++++++++++++++
 .../common/utils/CompressionStrategy.java       |  52 ++++++++++
 .../common/utils/CompressionUtilsTest.java      |  62 ++++++++++++
 .../nonbulk/geo/GeoEnrichmentLoader.java        |  70 +++++++++----
 .../nonbulk/geo/GeoEnrichmentLoaderTest.java    |  40 ++++++--
 5 files changed, 298 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
new file mode 100644
index 0000000..f9c53c8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipException;
+
+/*
+ * Factory to provide various compression strategies.
+ */
+public enum CompressionStrategies implements CompressionStrategy {
+
+  GZIP(new CompressionStrategy() {
+    @Override
+    public void compress(File inFile, File outFile) throws IOException {
+      try (FileInputStream fis = new FileInputStream(inFile);
+          FileOutputStream fos = new FileOutputStream(outFile);
+          GZIPOutputStream gzipOS = new GZIPOutputStream(fos)) {
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = fis.read(buffer)) != -1) {
+          gzipOS.write(buffer, 0, len);
+        }
+      }
+    }
+
+    @Override
+    public void decompress(File inFile, File outFile) throws IOException {
+      try (FileInputStream fis = new FileInputStream(inFile);
+          GZIPInputStream gis = new GZIPInputStream(fis);
+          FileOutputStream fos = new FileOutputStream(outFile)) {
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = gis.read(buffer)) != -1) {
+          fos.write(buffer, 0, len);
+        }
+      }
+
+    }
+
+    @Override
+    public boolean test(File gzipFile) {
+      try (FileInputStream fis = new FileInputStream(gzipFile);
+          GZIPInputStream gis = new GZIPInputStream(fis)) {
+        byte[] buffer = new byte[1024];
+        // this will throw an exception on malformed file
+        gis.read(buffer);
+      } catch (ZipException | EOFException e) {
+        return false;
+      } catch (IOException e) {
+        throw new IllegalStateException("Error occurred while attempting to validate gzip file", e);
+      }
+      return true;
+    }
+  });
+
+  private CompressionStrategy strategy;
+
+  CompressionStrategies(CompressionStrategy strategy) {
+    this.strategy = strategy;
+  }
+
+  @Override
+  public void compress(File inFile, File outFile) throws IOException {
+    strategy.compress(inFile, outFile);
+  }
+
+  @Override
+  public void decompress(File inFile, File outFile) throws IOException {
+    strategy.decompress(inFile, outFile);
+  }
+
+  @Override
+  public boolean test(File gzipFile) {
+    return strategy.test(gzipFile);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
new file mode 100644
index 0000000..6a98c95
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import java.io.File;
+import java.io.IOException;
+
+public interface CompressionStrategy {
+
+  /**
+   * Compress infile.
+   *
+   * @param inFile file to compress
+   * @param outFile destination path for compressed output
+   * @throws IOException Any IO error
+   */
+  void compress(File inFile, File outFile) throws IOException;
+
+  /**
+   * Decompress infile.
+   *
+   * @param inFile file to decompress
+   * @param outFile destination path for decompressed output
+   * @throws IOException Any IO error
+   */
+  void decompress(File inFile, File outFile) throws IOException;
+
+  /**
+   * Test if file is proper gzip format. True if valid, false otherwise.
+   *
+   * @param gzipFile file to check for gzip compression
+   * @return true if file is a gzip format, false otherwise.
+   */
+  boolean test(File gzipFile);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
new file mode 100644
index 0000000..50d83e9
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressionUtilsTest {
+
+  private static final String SAMPLE_TEXT = "hello world";
+  private File tempDir;
+  private File textFile;
+
+  @Before
+  public void setup() throws IOException {
+    tempDir = TestUtils.createTempDir(this.getClass().getName());
+    textFile = new File(tempDir, "test-text-file.txt");
+    TestUtils.write(textFile, SAMPLE_TEXT);
+  }
+
+  @Test
+  public void compresses_Gzip() throws IOException {
+    File gzipFile = new File(tempDir, "test-gz-compression-file.gz");
+    CompressionStrategies.GZIP.compress(textFile, gzipFile);
+    assertThat(CompressionStrategies.GZIP.test(gzipFile), equalTo(true));
+  }
+
+  @Test
+  public void decompresses_Gzip() throws IOException {
+    File gzipFile = new File(tempDir, "test-gz-decompress.gz");
+    CompressionStrategies.GZIP.compress(textFile, gzipFile);
+    assertThat("gzipped file should exist", gzipFile.exists(), equalTo(true));
+    File unzippedText = new File(tempDir, "test-gz-decompressed.txt");
+    CompressionStrategies.GZIP.decompress(gzipFile, unzippedText);
+    assertThat("decompressed file should exist", unzippedText.exists(), equalTo(true));
+    String actual = TestUtils.read(unzippedText);
+    assertThat("decompressed text should match", actual, equalTo(SAMPLE_TEXT));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
index b366015..fc89b89 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.CompressionStrategies;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
 
@@ -42,6 +43,9 @@ import java.time.ZoneOffset;
 import java.util.Map;
 
 public class GeoEnrichmentLoader {
+
+  private static final String DEFAULT_RETRIES = "2";
+
   private static abstract class OptionHandler implements Function<String, Option> {
   }
 
@@ -70,6 +74,15 @@ public class GeoEnrichmentLoader {
         o.setRequired(false);
         return o;
       }
+    }), RETRIES("re", new GeoEnrichmentLoader.OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "retries", true, "Number of geo db download retries, after an initial failure.");
+        o.setArgName("RETRIES");
+        o.setRequired(false);
+        return o;
+      }
     }), TMP_DIR("t", new GeoEnrichmentLoader.OptionHandler() {
       @Nullable
       @Override
@@ -146,7 +159,14 @@ public class GeoEnrichmentLoader {
     System.out.println("Retrieving GeoLite2 archive");
     String url = GeoEnrichmentOptions.GEO_URL.get(cli, "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz");
     String tmpDir = GeoEnrichmentOptions.TMP_DIR.get(cli, "/tmp") + "/"; // Make sure there's a file separator at the end
-    File localGeoFile = downloadGeoFile(url, tmpDir);
+    int numRetries = Integer.parseInt(GeoEnrichmentOptions.RETRIES.get(cli, DEFAULT_RETRIES));
+    File localGeoFile = null;
+    try {
+      localGeoFile = downloadGeoFile(url, tmpDir, numRetries);
+    } catch (IllegalStateException ies) {
+      System.err.println("Failed to download geo db file. Aborting");
+      System.exit(5);
+    }
     // Want to delete the tar in event of failure
     localGeoFile.deleteOnExit();
     System.out.println("GeoIP files downloaded successfully");
@@ -167,26 +187,40 @@ public class GeoEnrichmentLoader {
     System.out.println("Successfully created and updated new GeoIP information");
   }
 
-  protected File downloadGeoFile(String urlStr, String tmpDir) {
+  protected File downloadGeoFile(String urlStr, String tmpDir, int  numRetries) {
     File localFile = null;
-    try {
-      URL url = new URL(urlStr);
-      localFile = new File(tmpDir + new File(url.getPath()).getName());
+    int attempts = 0;
+    boolean valid = false;
+    while (!valid && attempts <= numRetries) {
+      try {
+        URL url = new URL(urlStr);
+        localFile = new File(tmpDir + new File(url.getPath()).getName());
 
-      System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath());
-      if (localFile.exists() && !localFile.delete()) {
-        System.err.println("File already exists locally and can't be deleted.  Please delete before continuing");
-        System.exit(3);
+        System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath());
+        if (localFile.exists() && !localFile.delete()) {
+          System.err.println(
+              "File already exists locally and can't be deleted.  Please delete before continuing");
+          System.exit(3);
+        }
+        FileUtils.copyURLToFile(url, localFile, 5000, 10000);
+        if (!CompressionStrategies.GZIP.test(localFile)) {
+          throw new IOException("Invalid Gzip file");
+        } else {
+          valid = true;
+        }
+      } catch (MalformedURLException e) {
+        System.err.println("Malformed URL - aborting: " + e);
+        e.printStackTrace();
+        System.exit(4);
+      } catch (IOException e) {
+        System.err.println("Warning: Unable to copy remote GeoIP database to local file, attempt " + attempts + ": " + e);
+        e.printStackTrace();
       }
-      FileUtils.copyURLToFile(url, localFile, 5000, 10000);
-    } catch (MalformedURLException e) {
-      System.err.println("Malformed URL - aborting: " + e);
-      e.printStackTrace();
-      System.exit(4);
-    } catch (IOException e) {
-      System.err.println("Unable to copy remote GeoIP database to local file: " + e);
-      e.printStackTrace();
-      System.exit(5);
+      attempts++;
+    }
+    if (!valid) {
+      System.err.println("Unable to copy remote GeoIP database to local file after " + attempts + " attempts");
+      throw new IllegalStateException("Unable to download geo enrichment database.");
     }
     return localFile;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
index 3fa0270..2babeee 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java
@@ -17,19 +17,24 @@
  */
 package org.apache.metron.dataloads.nonbulk.geo;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.junit.*;
+import org.apache.metron.common.utils.CompressionStrategies;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-
-import static org.junit.Assert.assertTrue;
-
 public class GeoEnrichmentLoaderTest {
   private class MockGeoEnrichmentLoader extends GeoEnrichmentLoader {
     @Override
@@ -43,7 +48,7 @@ public class GeoEnrichmentLoaderTest {
   private File remoteDir;
   private File tmpDir;
 
-    @Before
+  @Before
   public void setup() throws Exception {
       testFolder.create();
       remoteDir = testFolder.newFolder("remoteDir");
@@ -76,8 +81,10 @@ public class GeoEnrichmentLoaderTest {
 
   @Test
   public void testLoadGeoIpDatabase() throws Exception {
-    File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
-    dbFile.createNewFile();
+    File dbPlainTextFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
+    TestUtils.write(dbPlainTextFile, "hello world");
+    File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb.gz");
+    CompressionStrategies.GZIP.compress(dbPlainTextFile, dbFile);
     String[] argv = {"--geo_url", "file://" + dbFile.getAbsolutePath(), "--remote_dir", remoteDir.getAbsolutePath(), "--tmp_dir", tmpDir.getAbsolutePath(), "--zk_quorum", "test:2181"};
     String[] otherArgs = new GenericOptionsParser(argv).getRemainingArgs();
     CommandLine cli = GeoEnrichmentLoader.GeoEnrichmentOptions.parse(new PosixParser(), otherArgs);
@@ -88,4 +95,21 @@ public class GeoEnrichmentLoaderTest {
     FileSystem fs = FileSystem.get(config);
     assertTrue(fs.exists(new Path(remoteDir + "/" + dbFile.getName())));
   }
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void loader_throws_exception_on_bad_gzip_file() throws Exception {
+    File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb");
+    dbFile.createNewFile();
+
+    String geoUrl = "file://" + dbFile.getAbsolutePath();
+    int numRetries = 2;
+    exception.expect(IllegalStateException.class);
+    exception.expectMessage("Unable to download geo enrichment database.");
+    GeoEnrichmentLoader loader = new MockGeoEnrichmentLoader();
+    loader.downloadGeoFile(geoUrl, tmpDir.getAbsolutePath(), numRetries);
+  }
+
 }