You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by nickwallen <gi...@git.apache.org> on 2017/06/22 21:55:30 UTC

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

GitHub user nickwallen opened a pull request:

    https://github.com/apache/metron/pull/622

    METRON-1005 Create Decodable Row Key for Profiler

    To be able to answer the types of questions that I outlined in [METRON-450](https://issues.apache.org/jira/browse/METRON-450), we need a row key that is decodable. Right now there is no logic to decode a row key, nor is the existing row key easily decodable.
    
    Once the row keys can be decoded, you could scan all of the row keys in the Profiler's HBase table, decode each of them and extract things like, the names of all your profiles, the names of entities within a profile, the period duration of a given profile.
    
    - [ ] Do not merge.  Opening this PR for review and feedback.  I still need to run this through manual testing.
    
    - [ ] **WARNING**: This change is NOT backwards compatible.  The row key format has changed.  All data written with the legacy row key cannot be read by the new code in this PR.  Should I make this backwards compatible?  I could make these changes live in a new `RowKeyBuilder` implementation, which would allow for backwards compatibility.
    
    ## Pull Request Checklist
    - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). 
    - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [ ] Have you included steps or a guide to how the change may be verified and tested manually?
    - [x] Have you ensured that the full suite of tests and checks have been executed in the root incubating-metron folder via:
    - [x] Have you written or updated unit tests and or integration tests to verify your changes?
    - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/metron METRON-1005

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #622
    
----
commit edd7fa6946b4bcb5975c8497ab5550201718e426
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-06-22T21:45:40Z

    METRON-1005 Create Decodable Row Key for Profiler

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129358521
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    --- End diff --
    
    The "overall review comments" are at https://github.com/apache/metron/pull/622#pullrequestreview-51932717 and may not be visible "above" depending on your current view.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/622
  
    > Create a Profile Audit Log table in HBase
    
    This is an interesting idea, Matt.  I like it.  Along these same lines I was thinking of a table of contents.  I think your audit log idea is one way to implement a table of contents. 
    
    I have looked at a TSDB implementation backed by HBase, OpenTSDB, and they use a table of contents approach.  The ToC records metadata about the time series data that is stored. 
    
    But I don't think these ideas are mutually exclusive with a decodable row key.  The decodable row key would allow us to rebuild the ToC should it become corrupted or lost.
    
    Are you thinking that a decodable row key is not needed at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127329675
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java ---
    @@ -44,7 +46,17 @@
      * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
      * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
      * </ul>
    + *
    + * This row key builder has no logic to decode a row key, nor is the row key generated by this builder
    + * easily decodable.  More specifically, the profile, entity, groups and period that make up the row key
    + * cannot be extracted from a previously generated row key.  This makes it difficult to answer questions
    + * like; What entities are included in this profile?  What is the period for this profile?  Use the
    + * DecodableRowKeyBuilder instead.
    + *
    + * @deprecated Replaced by DecodableRowKeyBuilder
    + * @see DecodableRowKeyBuilder
      */
    +@Deprecated
     public class SaltyRowKeyBuilder implements RowKeyBuilder {
    --- End diff --
    
    I marked the old `RowKeyBuilder` as deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127326600
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/RowKeyBuilderFactory.java ---
    @@ -0,0 +1,125 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.client.stellar;
    +
    +import org.apache.commons.beanutils.PropertyUtils;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.common.utils.ReflectionUtils;
    +import org.apache.metron.profiler.hbase.RowKeyBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_ROW_KEY_BUILDER;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * A Factory class that can create a RowKeyBuilder based on global property values.
    + */
    +public class RowKeyBuilderFactory {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(RowKeyBuilderFactory.class);
    +
    +  /**
    +   * Create a RowKeyBuilder.
    +   * @param global The global properties.
    +   * @return A RowKeyBuilder instantiated using the global property values.
    +   */
    +  public static RowKeyBuilder create(Map<String, Object> global) {
    +    String rowKeyBuilderClass = PROFILER_ROW_KEY_BUILDER.get(global, String.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_ROW_KEY_BUILDER, rowKeyBuilderClass);
    +
    +    // instantiate the RowKeyBuilder
    +    RowKeyBuilder builder = ReflectionUtils.createInstance(rowKeyBuilderClass);
    +    setSaltDivisor(global, builder);
    +    setPeriodDuration(global, builder);
    --- End diff --
    
    But I think this actually turned out worse, than the alternative of just adding `RowKeyBuilder.setSaltDivisor` and polluting the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    I do not like the idea of a using a profile `profileSpecSerialNumber`.   I don't think this buys us much except some space savings in hbase at the expense of complexity on the read (we need to keep a profile index resident in memory for lookup that is in sync with hbase).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129190841
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    --- End diff --
    
    This proposed encoding/decoding only delivers the entity value and groupvalue(s), not the entity characteristics and grouping(s) that were evaluated to generate them.  Thus their semantics may or may not be clear.  I would expect to be able to reproduce not just the ProfilePeriod, but the Profile specification, including the meaning of the entity value and groupvalues.  In order to do this, the full Profile specification needs to be retained somewhere/somehow, (similar as you have retained the periodDuration).  Please see overall review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127327126
  
    --- Diff: metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---
    @@ -29,7 +29,7 @@ components:
                 - name: "saltDivisor"
    --- End diff --
    
    Notice that the legacy `RowKeyBuilder`, the `SaltyRowKeyBuilder`, is still the default.  If a user wants to use the new `RowKeyBuilder` then they need to change the flux file here and specify `org.apache.metron.profiler.hbase.DecodableRowKeyBuilder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @nickwallen brought up the issue of wildcard queries on our rowkeys.  It has always bothered me that we can't do wildcard queries on groups.  If you have, for example, a single groupBy based on day of week, that's just 7 possible values, and if you want them all you could just do 7 queries and combine them.  But if you have three groupBy's, and they have 7, 31, and 256 possible values, then to simulate a wildcard query you would have to do over 55,000 individual queries!  Of course you would just do an hbase scan, but it would require a full table scan to select the time range desired.
    
    I propose that we re-order the rowkey elements to support prefix queries on Profile and time range, with wildcarding for primarily groups, and secondarily entities, ie:
    \<salt\>\<magic\>\<profileHash\>\<period\>\<entity\>\<groups\>
    
    So if I want the results for all rows in a time range regarding entity "192.168.222.123" regardless of group, I can query it, and if I want all rows in a time range regardless of entity value or group, I can query that too, as efficiently as an ordinary time range query.  What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    And btw, since there is no easily expressed algorithm for the NLP part of the problem, I'm +1 on doing both a decodable rowkey and a ToC.  For the existing profiles that @cestella expressed concern about, I would point out that as long as one DOES have the Profile specs still lying around, it's actually easy to re-write the old Profiles into new format with decodable rowkeys.  That is a very modest-sized program, the main problem being noticing and dealing with duplicate titled Profiles with different periodDurations.  But the info I pointed out in the paper helps sufficiently, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129252112
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    Chiming in in favor of a prefixed hash. @mattf-horton  you're right about the hash preventing hotspotting.  Consider the case where we are doing batch updates of profiles ( you have a new profile, you want to backfill data).  In that case, you could be sending possibly quite a bit of puts through.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127330773
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,382 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    --- End diff --
    
    I added a `VERSION` field to the row key, hoping that this might help future changes to the `RowKeyBuilder`.  With this, I could potentially start to parse the row key and then choose the right `RowKeyBuilder` implementation; the one used to create the row key.  This would make row key changes seemless to users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127325245
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/RowKeyBuilderFactory.java ---
    @@ -0,0 +1,125 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.client.stellar;
    +
    +import org.apache.commons.beanutils.PropertyUtils;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.common.utils.ReflectionUtils;
    +import org.apache.metron.profiler.hbase.RowKeyBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_ROW_KEY_BUILDER;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * A Factory class that can create a RowKeyBuilder based on global property values.
    + */
    +public class RowKeyBuilderFactory {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(RowKeyBuilderFactory.class);
    +
    +  /**
    +   * Create a RowKeyBuilder.
    +   * @param global The global properties.
    +   * @return A RowKeyBuilder instantiated using the global property values.
    +   */
    +  public static RowKeyBuilder create(Map<String, Object> global) {
    +    String rowKeyBuilderClass = PROFILER_ROW_KEY_BUILDER.get(global, String.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_ROW_KEY_BUILDER, rowKeyBuilderClass);
    +
    +    // instantiate the RowKeyBuilder
    +    RowKeyBuilder builder = ReflectionUtils.createInstance(rowKeyBuilderClass);
    +    setSaltDivisor(global, builder);
    +    setPeriodDuration(global, builder);
    --- End diff --
    
    Here is the logic to instantiate a `RowKeyBuilder` that is used by the Profiler Client's `GetProfile`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129226069
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    \@ All, please note that my overall review comment (as opposed to these detailed comments) weren't published to email by github.  Please see it at https://github.com/apache/metron/pull/622#pullrequestreview-51932717 
    It does address the issue of changed profile specs being different profiles.
    
    @simonellistonball , your question sort of brings up the issue of why we use salts at all.  Apparently the leading bytes are important; if they weren't there would be no reason to use a salt, but we could just trust in the later bytes to sufficiently differentiate the partition/region for the row.  As I understand hbase table partitioning (imperfectly), the more times partitioning has happened, and the broader, more well-mixed set of rowkeys are in the table, then the more likely that rowkeys will end up in the same partition (region) unless they differ early in the byte array.  So, if the hbase instance mostly only has data with similar rowkeys, then the bytes later in the rowkey, such as the profile name, will serve sufficiently.  But as the table fills up with a broader range of rowkeys, as it will with our salting strategy, and as it grows larger over time, then a bunch of inserts with the same salt (leading few bytes) is more likely to land in the same partition.
    
    I'm not an hbase expert, so if you feel I've got this wrong, please clarify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by simonellistonball <gi...@git.apache.org>.
Github user simonellistonball commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129221553
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    doesn't the profile name in the key not achieve this? we should probably enforce those being unique in config (we may already). I suppose there is an argument that if you change a profile config it can no longer be the same thing, and so should essentially lose its memory, but that's probably a bigger discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127328660
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java ---
    @@ -216,21 +211,7 @@ private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
        * @param global The global configuration.
        */
       private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
    -
    -    // how long is the profile period?
    -    long duration = PROFILER_PERIOD.get(global, Long.class);
    -    LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
    -
    -    // which units are used to define the profile period?
    -    String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
    -    TimeUnit units = TimeUnit.valueOf(configuredUnits);
    -    LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
    -
    -    // what is the salt divisor?
    -    Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
    -    LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
    -
    -    return new SaltyRowKeyBuilder(saltDivisor, duration, units);
    +    return RowKeyBuilderFactory.create(global);
    --- End diff --
    
    This is where we need to instantiate the `RowKeyBuilder` for the Profiler Client API.  Like I will discuss in another thread, the logic got complex and kind of nasty so I encapsulated it in its own `RowKeyBuilderFactory`.  See that class for a further discussion as to why it is kind of nasty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    Here's what I've got on decoding old rowkeys:
    https://gist.github.com/mattf-horton/8e685e373b1a3fa6aeec8ef8828be096
    
    The format of the keys is
    `salt (4B) + profile name (?) + entity name (?) + groupvalues (?) + period (8B)`
    with most of it (all but the salt and period number) in the clear as human-readable strings.
    
    Deducing periodDuration has a nice arithmetic answer, I think.
    The NLP issues are of course harder.  Enjoy the read, it's only two pages.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129309663
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java ---
    @@ -81,20 +99,19 @@ public SaltyRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
        * @return All of the row keys necessary to retrieve the profile measurements.
        */
       @Override
    -  public List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long start, long end) {
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
         // be forgiving of out-of-order start and end times; order is critical to this algorithm
         end = Math.max(start, end);
         start = Math.min(start, end);
    --- End diff --
    
    This does look fishy.  I will open a separate JIRA and track this separately.  Thanks, @mattf-horton!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129356631
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    --- End diff --
    
    @nickwallen , right, now I see that each group value is itself L/V encoded.  I would suggest then that a count of group values should precede the set of L/V encoded group values.  Again it's not strictly necessary, but allows deterministic decoding without depending on groups being the last variable-length element in the rowkey.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    Let me take a look at this more deeply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129356740
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,382 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(1000, 15, TimeUnit.MINUTES);
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    --- End diff --
    
    @nickwallen , I retract this comment.  Sorry I didn't read carefully enough.  Of course L/V encoding is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129227588
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    I should acknowledge that the current salting strategy is perhaps sufficient, since each active Profile typically records a single row every few _minutes_ , so if 20 or 30 active Profiles bunch up in a single partition, and then they do it again a few minutes later in a different partition, it probably doesn't actually qualify as a "hotspot" -- unlike the problem with unsalted timestamp based rowkeys with thousands of inserts per second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/622
  
    I also think this row key implementation would be very helpful (if not necessary) if we want to implement Profiler Client APIs in languages that aren't JVM based.  For example, the magic number can help identify byte-order differences, the version number helps us version changes over time, and the size fields allow us to reliably parse each value from the row key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen closed the pull request at:

    https://github.com/apache/metron/pull/622


---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129252984
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    --- End diff --
    
    I'd argue that the lengths should be `short`s, not `int`s.  Remember, the key is replicated for each column, not just for each row, in the HBase internal structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129318110
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,382 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(1000, 15, TimeUnit.MINUTES);
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    --- End diff --
    
    Currently, it prepends each group with its length instead of using a delimiter. There are some unit tests that exercise this.   
    
    Were you thinking this did not work at all or were you suggesting a better way to do it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127331446
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/RowKeyBuilderFactory.java ---
    @@ -0,0 +1,125 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.client.stellar;
    +
    +import org.apache.commons.beanutils.PropertyUtils;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.common.utils.ReflectionUtils;
    +import org.apache.metron.profiler.hbase.RowKeyBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_ROW_KEY_BUILDER;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * A Factory class that can create a RowKeyBuilder based on global property values.
    + */
    +public class RowKeyBuilderFactory {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(RowKeyBuilderFactory.class);
    +
    +  /**
    +   * Create a RowKeyBuilder.
    +   * @param global The global properties.
    +   * @return A RowKeyBuilder instantiated using the global property values.
    +   */
    +  public static RowKeyBuilder create(Map<String, Object> global) {
    +    String rowKeyBuilderClass = PROFILER_ROW_KEY_BUILDER.get(global, String.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_ROW_KEY_BUILDER, rowKeyBuilderClass);
    +
    +    // instantiate the RowKeyBuilder
    +    RowKeyBuilder builder = ReflectionUtils.createInstance(rowKeyBuilderClass);
    +    setSaltDivisor(global, builder);
    +    setPeriodDuration(global, builder);
    --- End diff --
    
    If I had some IoC-like functionality like Flux or Spring here, then this wouldn't be a problem at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129370546
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    --- End diff --
    
    An example in support: suppose you decode rowkeys, and see a bunch of entity values that look like "192.168.52.122", "10.5.123.52", "192.147.130.204", "172.217.5.110".  Of course those are probably IP addresses.  But are they source addresses or destination addresses?  There's no way to tell without access to the Profile spec.
    
    Group values, of course, are potentially even worse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r128650596
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java ---
    @@ -81,20 +99,19 @@ public SaltyRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
        * @return All of the row keys necessary to retrieve the profile measurements.
        */
       @Override
    -  public List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long start, long end) {
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
         // be forgiving of out-of-order start and end times; order is critical to this algorithm
         end = Math.max(start, end);
         start = Math.min(start, end);
    --- End diff --
    
    Heh, this has been in the code for a long time, but isn't this a bug?  If it starts out in the wrong order, say end is 1 and start is 5, won't this pair of statements result in both end and start being equal to the larger, ie 5 ?  We need an intermediate variable for a binary swap!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @cestella , 
    > Would this approach require scans on read in the critical path?
    
    I don't perceive that decoding rowkeys is on any critical path.  You only need to look up Profile by serial number (or hash) in the case of decoding rowkeys.  No? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    I would agree that it's easy to rewrite the old profiles in the new decodable format IF we can read the old profile keys and pull out their info (even fuzzily).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129319336
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    --- End diff --
    
    @mattf-horton A 'length' the number of groups along with the size of each group name is encoded.  See the function `encodeGroup`.
    
    @cestella I would be fine with shorts instead of ints.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127329548
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,382 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    --- End diff --
    
    The new `RowKeyBuilder` implementation that is decodable.  Everyone should just use this, but the older implementation is left for backwards compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127327990
  
    --- Diff: metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---
    @@ -29,7 +29,7 @@ components:
                 - name: "saltDivisor"
    --- End diff --
    
    I chose to require a flux file change here because this is not something that any old user should just change.  This is only configurable to allow for backwards compatibility.
    
    In addition, the IoC-like functionality that Flux provides makes it easier to set whatever configuration values are needed by a `RowKeyBuilder` (like salt divisor) without polluting the `RowKeyBuilder` interface.
    
    There is no such convenient IoC-like functionality when trying to instantiate the `RowKeyBuilder` from the client-side in `GetProfile`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    Hi @nickwallen , 
    - I didn't say "unnecessarily complex", I just said "adds complexity".  If we are to have a decodable row key, then something like this would be necessary.  The point of my comments on this is to shift to a top-down perspective:
    - The requirements you're trying to address are, I think, to be able to query metadata about existing Profiles, for multiple reasons including to not have old Profiles become inaccessible due to loss of metadata needed for Profile queries.
    - I think a ToC is a better way to do that than a decodable rowkey.  It localizes the metadata instead of having to do a full scan and reason about all the rowkeys for every metadata query.  I proposed the Profile audit log as a very simple way to implement a ToC, and thanks for saying that better than I did.
    - I'm not against doing a _more_ decodable row key, it just in my mind is not the simplest nor best solution to the evident requirement.  But it does constitute an architectural improvement.
    - As I briefly mentioned, I don't think our current row key is totally opaque, it just needs a brute-force approach to figure out.  Not suitable for interactive queries, but would be acceptable for a one-time pass to build (or re-build) the ToC.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    Also, while we're in here, is there a strong reason why the prefixed hash is so large?  It's just there for uniformity of distribution, correct?  I'd propose a non-cryptographic hash for this purpose like Murmur.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129252678
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    The other reason to prefix with a hash is in the situation where you have unbalanced keys.  So, the situation where one profile has a lot of data and other profiles don't end up with as much data.  In the case where we want to anticipate splits, having a uniform distribution of keys help because you can just presplit uniformly (as opposed to pre-splitting with a bias for a given key).  Having the uniformly distributed hash prefixing gives a uniformly distributed key distribution, so you can just presplit uniformly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    Your proposal has the advantage of making data in HBase self-identifying (if one has the key), which I always like.  However, it's a large change and induces yet more complexity.  There's an alternative I've been noodling occasionally, which I put forward here for consideration:
    
    Create a Profile Audit Log table in HBase.  Every time a Profiler is configured, started, or stopped, make one entry in the audit log.  The idea is to be able to answer exactly the kinds of questions posed in METRON-450, so the records should include things like the configuration, the first and last timestamps, and perhaps the key builder parameters.  This would prevent historical profiles from being "lost" because the would-be querier doesn't have access to the exact config parameters used to write the profile.
    
    For the sake of housekeeping, one might do a scan, daily and/or at system restart, to assure that (a) the set Profiles with a "start" but not an "end" recorded in the audit log, and (b) the set of currently running Profiles, are actually consistent, and record "inferred end" entries in the audit log for orphans found.
    
    This solution is somewhat backward-applicable to existing Profile data; I think there are brute-force ways to scan the existing HBase tables and infer audit log entries, especially if historical configuration data is still available.  We could write such a scanner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127326080
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/RowKeyBuilderFactory.java ---
    @@ -0,0 +1,125 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.client.stellar;
    +
    +import org.apache.commons.beanutils.PropertyUtils;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.common.utils.ReflectionUtils;
    +import org.apache.metron.profiler.hbase.RowKeyBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_ROW_KEY_BUILDER;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * A Factory class that can create a RowKeyBuilder based on global property values.
    + */
    +public class RowKeyBuilderFactory {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(RowKeyBuilderFactory.class);
    +
    +  /**
    +   * Create a RowKeyBuilder.
    +   * @param global The global properties.
    +   * @return A RowKeyBuilder instantiated using the global property values.
    +   */
    +  public static RowKeyBuilder create(Map<String, Object> global) {
    +    String rowKeyBuilderClass = PROFILER_ROW_KEY_BUILDER.get(global, String.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_ROW_KEY_BUILDER, rowKeyBuilderClass);
    +
    +    // instantiate the RowKeyBuilder
    +    RowKeyBuilder builder = ReflectionUtils.createInstance(rowKeyBuilderClass);
    +    setSaltDivisor(global, builder);
    +    setPeriodDuration(global, builder);
    --- End diff --
    
    I don't really like how I go about setting the salt divisor and period duration on the `RowKeyBuilder`.  There are no methods in the `RowKeyBuilder` interface to do set these values.  I could add something like `RowKeyBuilder.setSaltDivisor`, but I was trying not to pollute that interface with variables like salt divisor that may not apply to all RowKeyBuilder implementations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @mattf-horton Wouldn't you have to use the serial number to retrieve profiles?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129187853
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    --- End diff --
    
    I understand that you don't _need_ a length for groupB, but would recommend you have it, both for consistency of L/V encoding, and in case we later add a field after groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @mattf-horton Would this approach require scans on read in the critical path?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @nickwallen I haven't been following this discussion, but it seems like a useful feature / enhancement that's been hanging out awhile after active discussion petered out. What are the next steps here?  Does this PR need changes?  Should the discussion be revived on the user lists?  It doesn't seem like there was any consensus on the approach, but again, I like this enhancement a lot.


---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129193892
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,382 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(1000, 15, TimeUnit.MINUTES);
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    --- End diff --
    
    This output format does not provide delimiters for the (possible) several group values.  Recommend we should use comma (",") or other such character to delimit groupvalues if more than one is present.  If comma can be present in the groupvalues, have the user provide a delimiter character.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129186527
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    Since the salt calculation is dependent only on periodId and saltDivisor, and most Profiles will use the default saltDivisor, it does not avoid hotspots in the case where multiple running Profiles are using the same periodDuration (and hence all the Profiles write the same region simultaneously, each period).  To fix this, something unique to each Profile, such as a hash of the Profile specification, should be convolved into the salt.
    Since this PR changes much about rowkeys, it should fix this problem too.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @cestella , tl;dr: The discussion of serial numbers is a distraction.  Let's just use the profileHash and forget the serial number.  It was a micro-optimization.
    
    Answer to your question:  Two cases:
    - If you have the profileHash, then you can look up the Profile using an hbase wildcard query for rowkey \<profileHash\>\* , and since the profileHash is unique, it will be essentially as efficient as using the full rowkey.
    - If you are trying to decode a rowkey and only have the serial number then I stated some assumptions: "The expectation is that we will seldom (almost never) need to reference back to the Profile specification, and the total usage of Profile specs will be human-scale finite, **so it is okay to "scan" the ProfileSpecs table to find the full Profile spec referenced by a profileSN.** If this is not true, use the full hash as both the rowkey in the PeriodSpecs table, and as the reference element in the Profile rowkeys."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    I want to point out that I am also in favor of an audit log for the profiler, but I don't think it's a complete solution for the batch analytics use-case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    So, in my mind the feature here is the enablement of batch analytics on the profiles.  To that end, I'm in general in favor of a decodable row key. I think that the question really isn't a ToC *or* a decodable rowkey.  I think, rather, we will want both.  The two will follow different access patterns.  A decodable rowkey sans ToC will be suitable only for full table scan-style access.  A ToC would enable to slice or dice by profile/entity/etc.  
    
    That being said, a ToC without a decodable rowkey is substantially less nice.  Without being able to decode the rowkey, we will not be able to regenerate the ToC to provide alternative indexing.  I see this as a first step to enable a broader discussion on just what kind of access semantics beyond Get/Put we want to place on the profiles.
    
    All that to say, I'm in favor of the effort.  I worry at the impact going forward to existing profiles, though.  From the point where we do this, we will create a fork whereby new profiles and old profiles diverge.  I think we need to discuss the migration story more explicitly and see if it is plausible to create a migration tool that is fuzzy (i.e. will look at the existing profiles and try to pick them apart).
    
    I'd be ok for that work to be a follow-on, but I would want the plan to be very explicit and I would be -1 for a release until it's in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/622
  
    I strongly vote that you move these changes to a new RowKeyBuilder and keep the old one default until a major release.  The sad thing is, I don't think there *is* a way to migrate the old data, so we're kinda screwed here in terms of backwards compatibility.  I definitely agree that we should move to a profiler row key that is decodable, though, so it's very good work @nickwallen .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/622
  
    > Your proposal has the advantage of making data in HBase self-identifying (if one has the key), which I always like. However, it's a large change and induces yet more complexity
    
    What do you find unnecessarily complex here?  The code base was already designed to accept different row key implementations.  So this change involves the following.
    
    1. The new decodable row key 
    2. Profiler client logic to instantiate row key builders
    3. Profiler client logic to pass parameters to the instantiated row key builders
    
    I would agree that I think item 3 is unnecessarily complex.  That's where I wanted feedback.  I think just passing parameters through an interface method would simplify this a lot.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r127329289
  
    --- Diff: metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/RowKeyBuilderFactory.java ---
    @@ -0,0 +1,125 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.client.stellar;
    +
    +import org.apache.commons.beanutils.PropertyUtils;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.common.utils.ReflectionUtils;
    +import org.apache.metron.profiler.hbase.RowKeyBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_ROW_KEY_BUILDER;
    +import static org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * A Factory class that can create a RowKeyBuilder based on global property values.
    + */
    +public class RowKeyBuilderFactory {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(RowKeyBuilderFactory.class);
    +
    +  /**
    +   * Create a RowKeyBuilder.
    +   * @param global The global properties.
    +   * @return A RowKeyBuilder instantiated using the global property values.
    +   */
    +  public static RowKeyBuilder create(Map<String, Object> global) {
    +    String rowKeyBuilderClass = PROFILER_ROW_KEY_BUILDER.get(global, String.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_ROW_KEY_BUILDER, rowKeyBuilderClass);
    +
    +    // instantiate the RowKeyBuilder
    +    RowKeyBuilder builder = ReflectionUtils.createInstance(rowKeyBuilderClass);
    +    setSaltDivisor(global, builder);
    +    setPeriodDuration(global, builder);
    +
    +    return builder;
    +  }
    +
    +  /**
    +   * Set the period duration on the RowKeyBuilder.
    +   * @param global The global properties from Zk.
    +   * @param builder The RowKeyBuilder implementation.
    +   */
    +  private static void setPeriodDuration(Map<String, Object> global, RowKeyBuilder builder) {
    +
    +    // how long is the profile period?
    +    long duration = PROFILER_PERIOD.get(global, Long.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
    +
    +    // which units are used to define the profile period?
    +    String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
    +    TimeUnit units = TimeUnit.valueOf(configuredUnits);
    +    LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
    +
    +    // set the period duration
    +    final String periodDurationProperty = "periodDurationMillis";
    +    setProperty(builder, periodDurationProperty, units.toMillis(duration));
    +  }
    +
    +  /**
    +   * Set the salt divisor property on the RowKeyBuilder.
    +   * @param global The global properties from Zk.
    +   * @param builder The RowKeyBuilder implementation.
    +   */
    +  private static void setSaltDivisor(Map<String, Object> global, RowKeyBuilder builder) {
    +
    +    // what is the salt divisor?
    +    Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
    +    LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
    +
    +    final String saltDivisorProperty = "saltDivisor";
    +    setProperty(builder, saltDivisorProperty, saltDivisor);
    --- End diff --
    
    This basically sets the 'salt divisor' on any `RowKeyBuilder` that has a `saltDivisor` setter.  I really don't like this.  It is very hack-ish. I would love to use a simpler alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/622
  
    There was a lot of good discussion on this, but I find it hard to summarize completely the positions of everyone including @mattf-horton and @cestella.  Here is my attempt in trying to do that.  Please correct anything that I have misstated.
    
    1. Everyone agreed that a ToC (table of contents) is a useful additional feature for the Profiler.  The decodable row key would be needed in addition to, not instead of, a ToC.
    
    1. In implementing a decodable row key, we do need to plan for future changes in row key format.  This was handled in this PR, but can be improved.
    
    1. The decodable row key feature should be completed **before** a ToC so that the row keys can be used to generate (or regenerate) a ToC on-demand.
    
    1. There were various suggestions made on how to shorten up the row key format.  Some of those I completed on this PR (like using a murmur hash) and others (like using shorts instead of ints) I would need to incorporate in a future PR for a decodable row key.
    
    1. There is a need for a migration tool.  A tool that can read the existing row key format and rewrite the same data using a new format.  This tool is necessary even if it cannot be implemented deterministically with the current row key format.  The tool may not hints from the user like the names of known profiles.
    
    
    Once I compile a summary of these changes, I will close this PR.  All enhancements around this will be implemented on new PRs.


---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by mattf-horton <gi...@git.apache.org>.
Github user mattf-horton commented on the issue:

    https://github.com/apache/metron/pull/622
  
    @cestella , we would not need to keep an index resident in memory.  Most of the time we would just have the active Profiles in memory, exactly as we do today.  You only need to retrieve the Profile by serial number on the rare occasions that you have to decode rowkeys.  That said, it's fine with me to just use the profileHash.  I agree it decreases complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron issue #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/622
  
    >  I don't think our current row key is totally opaque, it just needs a brute-force approach to figure out. Not suitable for interactive queries, but would be acceptable for a one-time pass to build (or re-build) the ToC.
    
    For reference, here is what the existing row key looks-like.
    
     salt (16B) + profile name (?) + entity name (?) + groups (?) + time (8B)
    
    How would you decode it?  The salt and the time components have known lengths; 16B and 8B respectively.  Other than those two components, I don't know how to distinguish the profile name, entity or groups.  I can only decode the row key if I already know either the profile name or the entity, which defeats the advantages of being able to decode it.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] metron pull request #622: METRON-1005 Create Decodable Row Key for Profiler

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/622#discussion_r129306225
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/DecodableRowKeyBuilder.java ---
    @@ -0,0 +1,402 @@
    +/*
    + *
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.hbase;
    +
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.metron.profiler.ProfilePeriod;
    +
    +import java.nio.BufferUnderflowException;
    +import java.nio.ByteBuffer;
    +import java.nio.ByteOrder;
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
    +import static org.apache.metron.profiler.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
    +
    +/**
    + * Responsible for building the row keys used to store profile data in HBase.
    + *
    + * This builder generates decodable row keys.  A decodable row key is one that can be interrogated to extract
    + * the constituent components of that row key.  Given a previously generated row key this builder
    + * can extract the profile name, entity name, group name(s), period duration, and period.
    + *
    + * The row key is composed of the following fields.
    + * <ul>
    + * <li>magic number - Helps to validate the row key.</li>
    + * <li>version - The version number of the row key.</li>
    + * <li>salt - A salt that helps prevent hot-spotting.
    + * <li>profile - The name of the profile.
    + * <li>entity - The name of the entity being profiled.
    + * <li>group(s) - The group(s) used to sort the data in HBase. For example, a group may distinguish between weekends and weekdays.
    + * <li>period - The period in which the measurement was taken. The first period starts at the epoch and increases monotonically.
    + * </ul>
    + */
    +public class DecodableRowKeyBuilder implements RowKeyBuilder {
    +
    +  /**
    +   * Defines the byte order when encoding and decoding the row keys.
    +   *
    +   * Making this configurable is likely not necessary and is left as a practice exercise for the reader. :)
    +   */
    +  private static final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    +
    +  /**
    +   * Defines some level of sane max field length to avoid any shenanigans with oddly encoded row keys.
    +   */
    +  private static final int MAX_FIELD_LENGTH = 1000;
    +
    +  /**
    +   * A magic number embedded in each row key to help validate the row key and byte ordering when decoding.
    +   */
    +  protected static final short MAGIC_NUMBER = 77;
    +
    +  /**
    +   * The version number of the row keys supported by this builder.
    +   */
    +  protected static final byte VERSION = (byte) 1;
    +
    +  /**
    +   * A salt can be prepended to the row key to help prevent hot-spotting.  The salt
    +   * divisor is used to generate the salt.  The salt divisor should be roughly equal
    +   * to the number of nodes in the Hbase cluster.
    +   */
    +  private int saltDivisor;
    +
    +  /**
    +   * The duration of each profile period in milliseconds.
    +   */
    +  private long periodDurationMillis;
    +
    +  public DecodableRowKeyBuilder() {
    +    this(PROFILER_SALT_DIVISOR.getDefault(Integer.class),
    +            PROFILER_PERIOD.getDefault(Long.class),
    +            TimeUnit.valueOf(PROFILER_PERIOD_UNITS.getDefault(String.class)));
    +  }
    +
    +  public DecodableRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
    +    this.saltDivisor = saltDivisor;
    +    this.periodDurationMillis = units.toMillis(duration);
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve profile measurements over
    +   * a time horizon.
    +   *
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param groups The group(s) used to sort the profile data.
    +   * @param start When the time horizon starts in epoch milliseconds.
    +   * @param end When the time horizon ends in epoch milliseconds.
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, long start, long end) {
    +    // be forgiving of out-of-order start and end times; order is critical to this algorithm
    +    end = Math.max(start, end);
    +    start = Math.min(start, end);
    +
    +    // find the starting period and advance until the end time is reached
    +    return ProfilePeriod.visitPeriods( start
    +            , end
    +            , periodDurationMillis
    +            , TimeUnit.MILLISECONDS
    +            , Optional.empty()
    +            , period -> encode(profile, entity, groups, period)
    +    );
    +
    +  }
    +
    +  /**
    +   * Builds a list of row keys necessary to retrieve a profile's measurements over
    +   * a time horizon.
    +   * <p>
    +   * This method is useful when attempting to read ProfileMeasurements stored in HBase.
    +   *
    +   * @param profile    The name of the profile.
    +   * @param entity     The name of the entity.
    +   * @param groups     The group(s) used to sort the profile data.
    +   * @param periods    The profile measurement periods to compute the rowkeys for
    +   * @return All of the row keys necessary to retrieve the profile measurements.
    +   */
    +  @Override
    +  public List<byte[]> encode(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
    +    List<byte[]> rowKeys = new ArrayList<>();
    +    for(ProfilePeriod period : periods) {
    +      rowKeys.add(encode(profile, entity, groups, period));
    +    }
    +    return rowKeys;
    +  }
    +
    +  /**
    +   * Builds the row key for a given profile measurement.
    +   * @param m The profile measurement.
    +   * @return The HBase row key.
    +   */
    +  @Override
    +  public byte[] encode(ProfileMeasurement m) {
    +    return encode(m.getProfileName(), m.getEntity(), m.getGroups(), m.getPeriod());
    +  }
    +
    +  /**
    +   * Build the row key.
    +   * @param profile The name of the profile.
    +   * @param entity The name of the entity.
    +   * @param period The period in which the measurement was taken.
    +   * @param groups The groups.
    +   * @return The HBase row key.
    +   */
    +  public byte[] encode(String profile, String entity, List<Object> groups, ProfilePeriod period) {
    +
    +    if(profile == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile name.");
    +    if(entity == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid entity name.");
    +    if(period == null)
    +      throw new IllegalArgumentException("Cannot encode row key; invalid profile period.");
    +
    +    long periodId = period.getPeriod();
    +    long periodDurationMillis = period.getDurationMillis();
    +
    +    byte[] salt = encodeSalt(periodId, saltDivisor);
    +    byte[] profileB = Bytes.toBytes(profile);
    +    byte[] entityB = Bytes.toBytes(entity);
    +    byte[] groupB = encodeGroups(groups);
    +
    +    int capacity = Short.BYTES + 1 + salt.length + profileB.length + entityB.length + groupB.length + (Integer.BYTES * 3) + (Long.BYTES * 2);
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate(capacity)
    +            .order(byteOrder)
    +            .putShort(MAGIC_NUMBER)
    +            .put(VERSION)
    +            .putInt(salt.length)
    +            .put(salt)
    +            .putInt(profileB.length)
    +            .put(profileB)
    +            .putInt(entityB.length)
    +            .put(entityB)
    +            .put(groupB)
    +            .putLong(periodId)
    +            .putLong(periodDurationMillis);
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Decodes a row key to build a ProfileMeasurement containing all of the
    +   * relevant fields that exist within the row key.
    +   *
    +   * @param rowKey The row key to decode.
    +   * @return A ProfileMeasurement.
    +   */
    +  @Override
    +  public ProfileMeasurement decode(byte[] rowKey) {
    +    ByteBuffer buffer = ByteBuffer
    +            .wrap(rowKey)
    +            .order(byteOrder);
    +
    +    try {
    +      // validate the magic number
    +      short magicNumber = buffer.getShort();
    +      if(magicNumber != MAGIC_NUMBER) {
    +        throw new IllegalArgumentException(String.format("Invalid magic number; expected '%s', got '%s'", MAGIC_NUMBER, magicNumber));
    +      }
    +
    +      // validate the row key version
    +      byte version = buffer.get();
    +      if(version != VERSION) {
    +        throw new IllegalArgumentException(String.format("Invalid version; expected '%s', got '%s'", VERSION, version));
    +      }
    +
    +      // validate the salt length
    +      int saltLength = buffer.getInt();
    +      if (saltLength <= 0 || saltLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid salt length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, saltLength));
    +      }
    +
    +      // decode the salt
    +      byte[] salt = new byte[saltLength];
    +      buffer.get(salt);
    +
    +      // validate the profile length
    +      int profileLength = buffer.getInt();
    +      if (profileLength <= 0 || profileLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid profile length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, profileLength));
    +      }
    +
    +      // decode the profile name
    +      byte[] profileBytes = new byte[profileLength];
    +      buffer.get(profileBytes);
    +      String profile = new String(profileBytes);
    +
    +      // validate the entity length
    +      int entityLength = buffer.getInt();
    +      if (entityLength <= 0 || entityLength > MAX_FIELD_LENGTH) {
    +        throw new IllegalArgumentException(String.format("Invalid entity length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, entityLength));
    +      }
    +
    +      // decode the entity
    +      byte[] entityBytes = new byte[entityLength];
    +      buffer.get(entityBytes);
    +      String entity = new String(entityBytes);
    +
    +      // decode the groups
    +      List<Object> groups = new ArrayList<>();
    +      int numberOfGroups = buffer.getInt();
    +      for (int i = 0; i < numberOfGroups; i++) {
    +
    +        // validate the group length
    +        int groupLength = buffer.getInt();
    +        if (groupLength <= 0 || groupLength > MAX_FIELD_LENGTH) {
    +          throw new IllegalArgumentException(String.format("Invalid group length; max allowed '%d', got '%d'", MAX_FIELD_LENGTH, groupLength));
    +        }
    +
    +        // decode the group
    +        byte[] groupBytes = new byte[groupLength];
    +        buffer.get(groupBytes);
    +
    +        String group = new String(groupBytes);
    +        groups.add(group);
    +      }
    +
    +      // decode the period
    +      long periodId = buffer.getLong();
    +      long duration = buffer.getLong();
    +      ProfilePeriod period = ProfilePeriod.buildFromPeriod(periodId, duration, TimeUnit.MILLISECONDS);
    +
    +      return new ProfileMeasurement()
    +              .withProfileName(profile)
    +              .withEntity(entity)
    +              .withGroups(groups)
    +              .withProfilePeriod(period);
    +
    +    } catch(BufferUnderflowException e) {
    +      throw new IllegalArgumentException("Unable to decode the row key", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Builds the 'group' component of the row key.
    +   * @param groups The groups to include in the row key.
    +   */
    +  private byte[] encodeGroups(List<Object> groups) {
    +
    +    // encode each of the groups and determine their size
    +    int lengthOfGroups = 0;
    +    List<byte[]> groupBytes = new ArrayList<>();
    +    for(Object group : groups) {
    +      byte[] groupB = Bytes.toBytes(String.valueOf(group));
    +      groupBytes.add(groupB);
    +      lengthOfGroups += groupB.length;
    +    }
    +
    +    // encode each of the groups
    +    ByteBuffer buffer = ByteBuffer
    +            .allocate((Integer.BYTES * (1 + groups.size())) + lengthOfGroups)
    +            .order(byteOrder)
    +            .putInt(groups.size());
    +
    +    for(byte[] groupB : groupBytes) {
    +      buffer.putInt(groupB.length).put(groupB);
    +    }
    +
    +    return buffer.array();
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period The ProfilePeriod in which the ProfileMeasurement was taken.
    +   */
    +  private static byte[] encodePeriod(ProfilePeriod period) {
    +    return encodePeriod(period.getPeriod());
    +  }
    +
    +  /**
    +   * Builds the 'time' portion of the row key
    +   * @param period the period
    +   */
    +  private static byte[] encodePeriod(long period) {
    +    return Bytes.toBytes(period);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period in which a profile measurement is taken.
    +   */
    +  public static byte[] encodeSalt(ProfilePeriod period, int saltDivisor) {
    +    return encodeSalt(period.getPeriod(), saltDivisor);
    +  }
    +
    +  /**
    +   * Calculates a salt value that is used as part of the row key.
    +   *
    +   * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
    +   * is close to the number of nodes in the Hbase cluster.
    +   *
    +   * @param period The period
    +   * @param saltDivisor The salt divisor
    +   */
    +  public static byte[] encodeSalt(long period, int saltDivisor) {
    +    try {
    +      // an MD5 is 16 bytes aka 128 bits
    +      MessageDigest digest = MessageDigest.getInstance("MD5");
    +      byte[] hash = digest.digest(encodePeriod(period));
    +      int salt = Bytes.toShort(hash) % saltDivisor;
    +      return Bytes.toBytes(salt);
    --- End diff --
    
    The downside of salting unfortunately is that doing a scan over a time range is tough.  While we don't do scans now, I think expanding the Profiler for more use cases will require us to do scans.  
    
    With a salt, you have to submit multiple scans, one for every possible salt value, and then merge what you get back.  
    
    That is why I was originally against using a salt.  But, of course, a salt has the advantage that @cestella outlined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---