You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/11/05 21:50:31 UTC

[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #4747: Data Anonymizer Tool

siddharthteotia commented on a change in pull request #4747: Data Anonymizer Tool
URL: https://github.com/apache/incubator-pinot/pull/4747#discussion_r342815880
 
 

 ##########
 File path: pinot-tools/src/main/java/org/apache/pinot/tools/PinotDataAndQueryAnonymizer.java
 ##########
 @@ -0,0 +1,1332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.data.DateTimeFieldSpec;
+import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.MetricFieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.index.ColumnMetadata;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.pql.parsers.pql2.ast.AstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.BetweenPredicateAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.BooleanOperatorAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.ComparisonPredicateAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.GroupByAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.InPredicateAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.LiteralAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.OutputColumnAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.OutputColumnListAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.PredicateAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.PredicateListAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.SelectAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.StarColumnListAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.StarExpressionAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.StringLiteralAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.WhereAstNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The goal of this tool is to generate test dataset (as Avro files) with
+ * characteristics similar to a given source dataset. The source dataset is
+ * a set of Pinot segments. The tool can be used in situations where actual
+ * source data isn't allowed to be used for the purpose of testing (regression,
+ * performance, functional, evaluation of other OLAP systems etc).
+ *
+ * The tool understands the characteristics of the given dataset (Pinot segments)
+ * and generate corresponding random data while preserving those characteristics.
+ * The tool can then also be used to generate queries for the random data.
+ *
+ * So if we have a set of production data which you want to use for testing
+ * but are unable to do so (because of security restrictions etc), then this tool
+ * can be used to generate corresponding anonymous data and queries. Users can then
+ * use the anonymized dataset (avro files) and generated queries for their testing.
+ *
+ * One avro file is generated per input Pinot segment. The tool also randomizes the
+ * column names (and table name) so that source schema is not revealed. The user is also
+ * allowed to provide a set of columns for which they want the data to be retained
+ * as is (not anonymized). User should be careful when choosing these columns. Ideally
+ * these should be time (or time related) columns since they don't reveal anything and so
+ * it is fine to copy them as is from souce segments into Avro files.
+ *
+ * Steps to use this tool are as follows:
+ *
+ * STEP 1 - Download a day’s queries (same day when we downloaded the segments) from
+ * Pinot broker query log. You may have to post-process the log to remove some noise
+ * and just keep queries
+ *
+ * STEP 2 - Point the tool to the query file. It will parse each query
+ * and output the set of columns participating in filters (WHERE clause).
+ *
+ * STEP 3 - Point the tool to directory containing Pinot segments along with
+ * the set of filter columns (identified in previous step) and their cardinalities
+ * to anonymize data. This step will first read dictionaries from each input segment
+ * and build a sorted global dictionary containing 1-1 mapping between an original
+ * column value and corresponding column value. Global dictionary will be built
+ * only for the set of filter columns. For remaining columns, we can generate any
+ * arbitrary random value directly into the Avro file directly.
+ *
+ * Please see the implementation notes further in the code explaining the global
+ * dictionary building phase and data generation phase in detail.
+ *
+ * STEP 4 - Point the tool to directory containing global dictionaries and column
+ * name mapping -- this directory will be the same as the outputDir used in previous
+ * step where the tool wrote avro files. Also point the tool to queryDir containing
+ * source query files. The tool will load the global dictionaries and use them to
+ * rewrite the WHERE part of the query. Similarly, the column name mapping file
+ * will be used to rewrite the select list of the original query. A file named
+ * "queries.generated" will be written into queryDir.
+ *
+ * Please see implementation notes further in the code explaining the query
+ * generation phase in detail.
+ *
+ * Also, please see usage examples in
+ * {@link org.apache.pinot.tools.admin.command.AnonymizeDataCommand} to learn
+ * how this tool can be invoked from command line.
+ *
+ * Limitations:
+ * (1) Add support for multi-value columns
+ * (2) Add support for BYTES type
+ * (3) Add support for partitioning (where dataset is hash partitioned on column)
+ * (4) Add support for ORDER BY in query generator
+ * (5) Potential memory explosion for extreme high cardinality global dictionary columns
+ */
+public class PinotDataAndQueryAnonymizer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotDataAndQueryAnonymizer.class);
+
+  private final static int INT_BASE_VALUE = 1000;
+  private final static long LONG_BASE_VALUE = 100000;
+  private static final float FLOAT_BASE_VALUE = 100.23f;
+  private static final double DOUBLE_BASE_VALUE = 1000.2375;
+  private static final String DICT_FILE_EXTENSION = ".dict";
+  private static final String COLUMN_MAPPING_FILE_KEY = "columns.mapping";
+  private static final String COLUMN_MAPPING_SEPARATOR = ":";
+
+  private final String _outputDir;
+  private int _numFilesToGenerate;
+  private final String _segmentDir;
+  private final String _filePrefix;
+  // dictionaries used to generate data with same cardinality and data distribution
+  // as in source table segments
+  private final Map<String, OrigAndDerivedValueHolder> _origToDerivedValueGlobalDictionary;
+  // used to map the original column name to a generated column name
+  private final Map<String, String> _origToDerivedColumnsMap;
+
+  private final Stopwatch _timeToBuildDictionaries = Stopwatch.createUnstarted();
+  private final Stopwatch _timeToGenerateAvroFiles = Stopwatch.createUnstarted();
+
+  private Schema _pinotSchema = null;
+  private org.apache.avro.Schema _avroSchema = null;
+
+  private final Map<String, FieldSpec.DataType> _columnToDataTypeMap;
+  // name of columns to build global dictionary for and corresponding total cardinality
+  private final Map<String, Integer> _globalDictionaryColumns;
+  // name of time (or time derived) columns for which we will retain data
+  private final Set<String> _timeColumns;
+
+  private String[] _segmentDirectories;
+
+  /**
+   * Create an instance of PinotDataGenerator
+   * @param outputDir parent directory where avro files will be generated
+   * @param segmentDir directory containing segment
+   * @param fileNamePrefix generated avro file name prefix
+   */
+  public PinotDataAndQueryAnonymizer(
+      String segmentDir,
+      String outputDir,
+      String fileNamePrefix,
+      Map<String, Integer> globalDictionaryColumns,
+      Set<String> timeColumns) {
+    _outputDir = outputDir;
+    _segmentDir = segmentDir;
+    _filePrefix = fileNamePrefix;
+    _origToDerivedValueGlobalDictionary = new HashMap<>();
+    _origToDerivedColumnsMap = new HashMap<>();
+    _columnToDataTypeMap = new HashMap<>();
+    _globalDictionaryColumns = globalDictionaryColumns;
+    _timeColumns = timeColumns;
+
+    for (String column : timeColumns) {
+      // sometime the predicates can also be on columns which the user
+      // wants to retain the data for as is and thus these columns will be
+      // part of filter column set as well.
+      // But since the values are retained for these columns, we
+      // don't need to build global dictionary for them. So remove
+      // these columns from the filter column set.
+      _globalDictionaryColumns.remove(column);
+    }
+
+    LOGGER.info("Columns to retain data for");
+    for (String column : _timeColumns) {
+      LOGGER.info("Column name: " + column);
+    }
+
+    LOGGER.info("Columns to build global dictionary for");
+    for (Map.Entry<String, Integer> entry : _globalDictionaryColumns.entrySet()) {
+      LOGGER.info("Column name: " + entry.getKey() + " cardinality: " + entry.getValue());
+    }
+  }
+
+  /*****************************************************
+   *                                                   *
+   *             Global Dictionary Builder             *
+   *                                                   *
+   *****************************************************/
+
+  /*
+   * Global Dictionary Implementation Notes
+   *
+   * We build global dictionary for a specific set of columns
+   * that participate in filter (predicates) in user queries.
+   *
+   * First step of using this tool is to feed in the actual
+   * queries and parse them to extract the set of columns
+   * that are there in WHERE clause.
+   *
+   * Once these columns are identified, we build global
+   * dictionaries for them for two main reasons
+   *
+   * (1) cardinality of such columns is same as in source data
+   * (2) distribution and order of values in such columns is
+   *     same as in source data.
+   *
+   * This ensures that queries with =, <, >, <=, >= predicates
+   * on the actual data yield same results on generated data
+   *
+   * Global dictionary is built in three steps:
+   *
+   * Step 1:
+   *
+   * Read dictionary from each segment and insert original
+   * values into the global dictionary.
+   *
+   * The values read from each segment dictionary will be in
+   * sorted order but that is not guaranteed across segments.
+   * Secondly, the value seen in a segment might have already
+   * been inserted into global dictionary while scanning the
+   * dictionary of previous segment.
+   *
+   * Since overall (across segments) there is no sort order and
+   * we need to prevent duplicates, we do a linear search to
+   * detect if the value has already been inserted into before
+   *
+   * Once we finish reading dictionaries from each segment,
+   * our global dictionary is 50% built -- it has all the
+   * original values.
+   *
+   * Step 2: Sort the original value array
+   *
+   * Step 3: Generate derived values (sorted)
+   *
+   * and this gives 1-1 mapping between original values
+   * and derived values while maintaining the order
+   *
+   * Finally, we persist the global dictionary and column
+   * name mapping to disk.
+   */
+
+
+  /**
+   * Per column holder to store both original values and corresponding
+   * derived values in global dictionary.
+   */
+  private static class OrigAndDerivedValueHolder {
+    FieldSpec.DataType _dataType;
+    Object[] _origValues;
+    Object[] _derivedValues;
+    int _index;
+    Comparator _comparator;
+
+    OrigAndDerivedValueHolder(
+        FieldSpec.DataType dataType,
+        int totalCardinality) {
+      _dataType = dataType;
+      // user specified cardinality might be slightly inaccurate depending
+      // on when the user determined the cardinality and when the source
+      // segments were given to this tool so just provision 10% additional
+      // capacity
+      _origValues = new Object[totalCardinality + (int)(0.1 * totalCardinality)];
+      // we will use index value as the actual total cardinality based
+      // on total number of values read from dictionary of each segment
+      _index = 0;
+      buildComparator();
+    }
+
+    void buildComparator() {
+      switch (_dataType) {
+        case INT:
+          _comparator = new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              return ((Integer)o1).compareTo((Integer)o2);
+            }
+          };
+          break;
+        case LONG:
+          _comparator = new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              return ((Long)o1).compareTo((Long)o2);
+            }
+          };
+          break;
+        case FLOAT:
+          _comparator = new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              return ((Float)o1).compareTo((Float) o2);
+            }
+          };
+          break;
+        case DOUBLE:
+          _comparator = new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              return ((Double)o1).compareTo((Double) o2);
+            }
+          };
+        case STRING:
+          _comparator = new Comparator() {
+            @Override
+            public int compare(Object o1, Object o2) {
+              return ((String)o1).compareTo((String) o2);
+            }
+          };
+          break;
+        default:
+          throw new UnsupportedOperationException("global dictionary currently does not support: " + _dataType.name());
+      }
+    }
+
+    void sort() {
+      Arrays.sort(_origValues, 0, _index, _comparator);
+    }
+
+    void addOrigValue(Object origValue) {
+      if (!linearSearch(origValue)) {
+        _origValues[_index++] = origValue;
+      }
+    }
+
+    void setDerivedValues(Object[] derivedValues) {
+      Preconditions.checkState(derivedValues.length == _index);
+      _derivedValues = derivedValues;
+    }
+
+    Object getDerivedValueForOrigValue(Object origValue) {
+      int index = binarySearch(0, _index - 1, origValue);
+      Preconditions.checkState(index >= 0, "Expecting origValue: " + origValue);
+      return _derivedValues[index];
+    }
+
+    // used during building global dictionary phase
+    // to prevent adding original values already
+    // seen from previous segments
+    boolean linearSearch(Object key) {
+      for (int i = 0; i < _index; i++) {
+        if (key == null) {
+          if (_origValues[i] == null) {
+            return true;
+          }
+        } else {
+          if (_origValues[i].equals(key)) {
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+
+    // used during data generation phase.
+    // since the global dictionary is fully built
+    // and sorted, we can do binary search
+    int binarySearch(int low, int high, Object key) {
+      if (low > high) return -1;
+
+      int mid = (low + high)/2;
+
+      if (_origValues[mid].equals(key)) {
+        return mid;
+      }
+
+      if (isLessThan(_origValues[mid], key) < 0) {
+        return binarySearch(mid + 1, high, key);
+      }
+
+      return binarySearch(low, mid - 1, key);
+    }
+
+    private int isLessThan(Object o1, Object o2) {
+      switch (_dataType) {
+        case INT:
+          return ((Integer) o1).compareTo((Integer)o2);
+        case LONG:
+          return ((Long) o1).compareTo((Long)o2);
+        case DOUBLE:
+          return ((Double) o1).compareTo((Double) o2);
+        case FLOAT:
+          return ((Float) o1).compareTo((Float) o2);
+        case STRING:
+          return ((String) o1).compareTo((String) o2);
+        default:
+          throw new IllegalStateException("unexpected data type: " + _dataType);
+      }
+    }
+  }
+
+  public void buildGlobalDictionaries() throws Exception {
+    if (_globalDictionaryColumns.isEmpty()) {
+      LOGGER.info("Set of global dictionary columns is empty");
+      return;
+    }
+
+    _timeToBuildDictionaries.start();
+
+    File segmentParentDirectory = new File(_segmentDir);
+    _segmentDirectories = segmentParentDirectory.list();
+    _numFilesToGenerate = _segmentDirectories.length;
+
+    // STEP 1 for building global dictionary
+    for (String segmentDirectory : _segmentDirectories) {
+      readDictionariesFromSegment(_segmentDir + "/" + segmentDirectory);
+    }
+
+    // STEP 2 for building global dictionary
+    sortOriginalValuesInGlobalDictionaries();
+
+    // STEP 3 for building global dictionary
+    addDerivedValuesToGlobalDictionaries();
+
+    _timeToBuildDictionaries.stop();
+    LOGGER.info("Finished building global dictionaries. Time taken: {}secs", _timeToBuildDictionaries.elapsed(TimeUnit.SECONDS));
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org