You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2021/09/28 22:47:08 UTC
[accumulo] branch main updated: Versioned Properties - refactored
to address PR comments (#2224)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 879977c Versioned Properties - refactored to address PR comments (#2224)
879977c is described below
commit 879977c8269bf1a029e7f8f0cf7847461f08f6d4
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Tue Sep 28 18:47:02 2021 -0400
Versioned Properties - refactored to address PR comments (#2224)
Adds VersionedProperties to manage related properties as a single entity.
Versioned properties are intended to be store and managed properties as a group (system, namespace, table) stored on a single ZooKeeper node rather than node / individual properties. The versioning is to allow efficient checking of cached entries vs stored entries.
- Encoding / Decoding with optional compression
- Test example that allows for encryption
This is the first step in replacing property management from single properties (in ZooKeeper) to storing related
properties on a single ZooKeeper node.
---
.../server/conf/codec/EncodingOptions.java | 108 ++++++++
.../server/conf/codec/VersionedPropCodec.java | 285 +++++++++++++++++++++
.../server/conf/codec/VersionedPropGzipCodec.java | 91 +++++++
.../server/conf/codec/VersionedProperties.java | 237 +++++++++++++++++
.../server/conf/codec/VersionedPropCodecTest.java | 70 +++++
.../conf/codec/VersionedPropEncryptCodec.java | 241 +++++++++++++++++
.../conf/codec/VersionedPropEncryptCodecTest.java | 228 +++++++++++++++++
.../conf/codec/VersionedPropGzipCodecTest.java | 132 ++++++++++
.../server/conf/codec/VersionedPropertiesTest.java | 182 +++++++++++++
9 files changed, 1574 insertions(+)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/codec/EncodingOptions.java b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/EncodingOptions.java
new file mode 100644
index 0000000..d8efa54
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/EncodingOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.StringJoiner;
+
+/**
+ * Serialization metadata to allow for evolution of the encoding used for property storage. This
+ * info is expected to be stored first in the serialization and uncompressed so that the handling of
+ * subsequent fields and data can be processed correctly and without additional processing.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class EncodingOptions {
+
+ // Adding an encoding version must be done as an addition. Do not change or delete previous
+ // version numbers - versions 999 and above reserved for testing
+ public static final int EncodingVersion_1_0 = 1;
+
+ private final int encodingVersion;
+ private final boolean compress;
+
+ EncodingOptions(final int encodingVersion, final boolean compress) {
+ this.encodingVersion = encodingVersion;
+ this.compress = compress;
+ }
+
+ /**
+ * Instantiate encoding options to use version 1.0 encoding settings.
+ *
+ * @param compress
+ * when true compress the property map.
+ * @return the encoding options.
+ */
+ public static EncodingOptions V1_0(final boolean compress) {
+ return new EncodingOptions(EncodingVersion_1_0, compress);
+ }
+
+ /**
+ * Instantiate an instance of EncodingOptions reading the values from an input stream. Typically,
+ * the stream will be obtained from reading a byte array from a data store and then creating a
+ * stream that reads from that array,
+ *
+ * @param dis
+ * a data input stream
+ * @throws IOException
+ * if an exception occurs reading from the input stream.
+ */
+ public static EncodingOptions fromDataStream(final DataInputStream dis) throws IOException {
+ return new EncodingOptions(dis.readInt(), dis.readBoolean());
+ }
+
+ /**
+ * Write the values to a data stream.
+ *
+ * @param dos
+ * a data output stream
+ * @throws IOException
+ * if an exception occurs writing the data stream.
+ */
+ public void encode(final DataOutputStream dos) throws IOException {
+ dos.writeInt(encodingVersion);
+ dos.writeBoolean(compress);
+ }
+
+ /**
+ * get the encoding version of the instance,
+ *
+ * @return the encoding version
+ */
+ public int getEncodingVersion() {
+ return encodingVersion;
+ }
+
+ /**
+ * get if the compress is set.
+ *
+ * @return true if the payload is compressed, false if not.
+ */
+ public boolean isCompressed() {
+ return compress;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", EncodingOptions.class.getSimpleName() + "[", "]")
+ .add("encodingVersion=" + encodingVersion).add("compress=" + compress).toString();
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropCodec.java b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropCodec.java
new file mode 100644
index 0000000..43a2c4f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropCodec.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.apache.accumulo.server.conf.codec.VersionedProperties.tsFormatter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Abstract class to provide encoding / decoding of versioned properties. This class handles the
+ * serialization of the metadata and subclasses are required to implement
+ * {@link #encodePayload(OutputStream, VersionedProperties, EncodingOptions)} and
+ * {@link #decodePayload(InputStream, EncodingOptions)} to handle any specific implementation
+ * metadata (optional) and the property map according to the encoding scheme of the subclass.
+ * <p>
+ * The basic encoding format:
+ * <ul>
+ * <li>encoding metadata - specifies codec to be used</li>
+ * <li>version metadata - specifies property versioning information</li>
+ * <li>codec specific metadata (optional)</li>
+ * <li>the property map</li>
+ * </ul>
+ *
+ */
+public abstract class VersionedPropCodec {
+
+ private final EncodingOptions encodingOpts;
+
+ public VersionedPropCodec(final EncodingOptions encodingOpts) {
+ this.encodingOpts = encodingOpts;
+ }
+
+ /**
+ * The general encoding options that apply to all encodings.
+ *
+ * @return the general options.
+ */
+ public EncodingOptions getEncodingOpts() {
+ return encodingOpts;
+ }
+
+ /**
+ * Serialize the versioned properties. The version information on the properties is updated if the
+ * data is successfully serialized.
+ *
+ * @param vProps
+ * the versioned properties.
+ * @return a byte array with the serialized properties.
+ */
+ public byte[] toBytes(final VersionedProperties vProps) throws IOException {
+
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos)) {
+
+ // write encoding metadata
+ encodingOpts.encode(dos);
+
+ // write version metadata
+ DataVersionInfo vMetadata =
+ new DataVersionInfo(vProps.getNextVersion(), vProps.getTimestamp());
+ vMetadata.write(dos);
+
+ // delegate property encoding to sub-class
+ encodePayload(bos, vProps, encodingOpts);
+
+ return bos.toByteArray();
+ }
+ }
+
+ /**
+ * Encode the properties and optionally any specific encoding metadata that is necessary to decode
+ * the payload with the scheme chosen.
+ *
+ * @param out
+ * an output stream
+ * @param vProps
+ * the versioned properties
+ * @param encodingOpts
+ * the general encoding options.
+ * @throws IOException
+ * if an error occurs writing to the underlying output stream.
+ */
+ abstract void encodePayload(final OutputStream out, final VersionedProperties vProps,
+ final EncodingOptions encodingOpts) throws IOException;
+
+ public VersionedProperties fromBytes(final byte[] bytes) throws IOException {
+
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis)) {
+
+ EncodingOptions encodingOpts = EncodingOptions.fromDataStream(dis);
+
+ if (!checkCanDecodeVersion(encodingOpts)) {
+ throw new IllegalArgumentException(
+ "Invalid data version - cannot process the version read: "
+ + encodingOpts.getEncodingVersion());
+ }
+
+ DataVersionInfo vMetadata = DataVersionInfo.fromDataStream(dis);
+
+ Map<String,String> props = decodePayload(bis, encodingOpts);
+
+ return new VersionedProperties(vMetadata.getDataVersion(), vMetadata.getTimestamp(), props);
+ }
+ }
+
+ abstract boolean checkCanDecodeVersion(final EncodingOptions encodingOpts);
+
+ /**
+ * Extracts the encoding version from the encoded byte array without fully decoding the payload.
+ * This is a convenience method if multiple encodings are present, and should only be required if
+ * upgrading / changing encodings, otherwise a single encoding should be in operation for an
+ * instance at any given time.
+ *
+ * @param bytes
+ * serialized encoded versioned property byte array.
+ * @return the encoding version used to serialize the properties.
+ */
+ public static int getEncodingVersion(final byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis)) {
+ return EncodingOptions.fromDataStream(dis).getEncodingVersion();
+ } catch (NullPointerException | IOException ex) {
+ throw new IllegalArgumentException("Failed to read encoding version from byte array provided",
+ ex);
+ }
+ }
+
+ /**
+ * Extracts the data version from the encoded byte array without fully decoding the payload.
+ * Normally the data version should be obtained from a fully decoded instance of the versioned
+ * properties.
+ * <p>
+ * The cost of reading the byte array from the backing store should be considered verses the
+ * additional cost of decoding - with a goal of reducing data reads from the store preferred.
+ * Generally reading from the store will be followed by some sort of usage which would require the
+ * full decode operation anyway.
+ *
+ * @param bytes
+ * serialized encoded versioned property byte array.
+ * @return the encoding version used to serialize the properties.
+ */
+ public static int getDataVersion(final byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis)) {
+ // skip encoding metadata
+ EncodingOptions.fromDataStream(dis);
+ return DataVersionInfo.fromDataStream(dis).getDataVersion();
+ } catch (NullPointerException | IOException ex) {
+ throw new IllegalArgumentException(
+ "Failed to read data version version from byte array provided", ex);
+ }
+ }
+
+ /**
+ * Decode the payload and any optional encoding specific metadata and return a map of the property
+ * name, value pairs.
+ *
+ * @param inStream
+ * an input stream
+ * @param encodingOpts
+ * the general encoding options.
+ * @return a map of properties name, value pairs.
+ * @throws IOException
+ * if an exception occurs reading from the input stream.
+ */
+ abstract Map<String,String> decodePayload(final InputStream inStream,
+ final EncodingOptions encodingOpts) throws IOException;
+
+ /**
+ * Read the property map from a data input stream as UTF strings. The input stream should be
+ * created configured by sub-classes for the output of the sub-class. If the sub-class uses an
+ * encoding other that UTF strings, they should override this method. An example would be an
+ * encoding that uses JSON to encode the map.
+ * <p>
+ * The handling the properties as UTF strings is one implementation. Subclasses can implement
+ * different mechanism if desired, one example might be using a JSON implementation to encode /
+ * decode the properties.
+ *
+ * @param dis
+ * a data input stream
+ * @return the property map
+ * @throws IOException
+ * if an exception occurs reading from the stream.
+ */
+ Map<String,String> readMapAsUTF(DataInputStream dis) throws IOException {
+
+ Map<String,String> aMap = new HashMap<>();
+ int items = dis.readInt();
+
+ for (int i = 0; i < items; i++) {
+ String k = dis.readUTF();
+ String v = dis.readUTF();
+ aMap.put(k, v);
+ }
+ return aMap;
+ }
+
+ /**
+ * Write the property map to the data output stream. The underlying stream is not closed by this
+ * method.
+ * <p>
+ * The handling the properties as UTF strings is one implementation. Subclasses can implement
+ * different mechanism if desired, one example might be using a JSON implementation to encode /
+ * decode the properties.
+ *
+ * @param dos
+ * a data output stream
+ * @param aMap
+ * the property map of k, v string pairs.
+ * @throws IOException
+ * if an exception occurs.
+ */
+ void writeMapAsUTF(final DataOutputStream dos, final Map<String,String> aMap) throws IOException {
+
+ dos.writeInt(aMap.size());
+
+ for (Map.Entry<String,String> e : aMap.entrySet()) {
+ dos.writeUTF(e.getKey());
+ dos.writeUTF(e.getValue());
+ }
+ dos.flush();
+ }
+
+ /**
+ * Helper class for reading / writing versioned properties metadata.
+ */
+ static class DataVersionInfo {
+ private final int dataVersion;
+ private final Instant timestamp;
+
+ public DataVersionInfo(final int dataVersion, final Instant timestamp) {
+ this.dataVersion = dataVersion;
+ this.timestamp = timestamp;
+ }
+
+ public static DataVersionInfo fromDataStream(final DataInputStream dis) throws IOException {
+ try {
+ var dataVersion = dis.readInt();
+ var timestamp = tsFormatter.parse(dis.readUTF(), Instant::from);
+ return new DataVersionInfo(dataVersion, timestamp);
+ } catch (Exception ex) {
+ throw new IOException("Could not parse data version info", ex);
+ }
+ }
+
+ public int getDataVersion() {
+ return dataVersion;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void write(final DataOutputStream dos) throws IOException {
+ dos.writeInt(dataVersion);
+ dos.writeUTF(tsFormatter.format(timestamp));
+ }
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodec.java b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodec.java
new file mode 100644
index 0000000..139469d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Initial property encoding that (optionally) uses gzip to compress the property map. The encoding
+ * version supported is EncodingVersion.V1_0.
+ */
+public class VersionedPropGzipCodec extends VersionedPropCodec {
+
+ private VersionedPropGzipCodec(final EncodingOptions encodingOpts) {
+ super(encodingOpts);
+ }
+
+ public static VersionedPropCodec codec(final boolean compress) {
+ return new VersionedPropGzipCodec(EncodingOptions.V1_0(compress));
+ }
+
+ @Override
+ void encodePayload(final OutputStream out, final VersionedProperties vProps,
+ final EncodingOptions encodingOpts) throws IOException {
+
+ Map<String,String> props = vProps.getProperties();
+
+ if (getEncodingOpts().isCompressed()) {
+ // Write the property map to the output stream, compressing the output using GZip
+ try (GZIPOutputStream gzipOut = new GZIPOutputStream(out);
+ DataOutputStream zdos = new DataOutputStream(gzipOut)) {
+
+ writeMapAsUTF(zdos, props);
+
+ // finalize compression
+ gzipOut.flush();
+ gzipOut.finish();
+ }
+ } else {
+ try (DataOutputStream dos = new DataOutputStream(out)) {
+ writeMapAsUTF(dos, props);
+ }
+ }
+
+ }
+
+ @Override
+ boolean checkCanDecodeVersion(final EncodingOptions encodingOpts) {
+ return encodingOpts.getEncodingVersion() == EncodingOptions.EncodingVersion_1_0;
+ }
+
+ Map<String,String> decodePayload(final InputStream inStream, final EncodingOptions encodingOpts)
+ throws IOException {
+ // read the property map keys, values
+ Map<String,String> aMap;
+ if (encodingOpts.isCompressed()) {
+ // Read and uncompress an input stream compressed with GZip
+ try (GZIPInputStream gzipIn = new GZIPInputStream(inStream);
+ DataInputStream zdis = new DataInputStream(gzipIn)) {
+ aMap = readMapAsUTF(zdis);
+ }
+ } else {
+ try (DataInputStream dis = new DataInputStream(inStream)) {
+ aMap = readMapAsUTF(dis);
+ }
+ }
+ return aMap;
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
new file mode 100644
index 0000000..d64254e
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static java.util.Objects.requireNonNull;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the metadata with the
+ * properties allows for comparison of properties and can be used to ensure that values being
+ * written to the backend store have not changed. This metadata should be written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+ public static final DateTimeFormatter tsFormatter =
+ DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+ // flag value for initialization - on store both the version and next version should be 0.
+ private static final int NO_VERSION = -2;
+ private final int dataVersion;
+ private final Instant timestamp;
+ private final Map<String,String> props;
+
+ /**
+ * Instantiate an initial instance with default version info and empty map.
+ */
+ public VersionedProperties() {
+ this(Map.of());
+ }
+
+ /**
+ * Instantiate an initial instance with default version info and provided property map.
+ *
+ * @param props
+ * optional map of initial property key, value pairs. The properties are assumed to have
+ * been previously validated (if required)
+ */
+ public VersionedProperties(Map<String,String> props) {
+ this(NO_VERSION, Instant.now(), props);
+ }
+
+ /**
+ * Instantiate an instance and set the initial properties to the provided values.
+ *
+ * @param dataVersion
+ * version info with data version and timestamp.
+ * @param timestamp
+ * timestamp of this version.
+ * @param props
+ * optional map of initial property key, value pairs. The properties are assumed to have
+ * been previously validated (if required)
+ */
+ public VersionedProperties(final int dataVersion, final Instant timestamp,
+ final Map<String,String> props) {
+ this.dataVersion = dataVersion;
+ this.timestamp = requireNonNull(timestamp, "A timestamp must be supplied");
+ this.props = props == null ? Map.of() : Map.copyOf(props);
+ }
+
+ /**
+ * Get an unmodifiable map with all property key,value pairs.
+ *
+ * @return An unmodifiable view of the property key, value pairs.
+ */
+ public Map<String,String> getProperties() {
+ return props;
+ }
+
+ /**
+ * Get the current data version. The version should match the node version of the stored data. The
+ * value should be used on data writes as the expected version. If the data write fails do to an
+ * unexpected version, it signals that the node version has changed since the instance was
+ * instantiated and encoded.
+ *
+ * @return 0 for initial version, otherwise the data version when the properties were serialized.
+ */
+ public int getDataVersion() {
+ return Math.max(dataVersion, 0);
+ }
+
+ /**
+ * Calculates the version that should be stored when serialized. The serialized version, when
+ * stored, should match the version that will be assigned. This way, data reading the serialized
+ * version can compare the stored version with the node version at any time to detect if the node
+ * version has been updated.
+ * <p>
+ * The initialization of the data version to a negative value allows this value to be calculated
+ * correctly for the first serialization. On the first store, the expected version will be 0.
+ *
+ * @return the next version number that should be serialized, or 0 if this is the initial version.
+ */
+ public int getNextVersion() {
+ return Math.max(dataVersion + 1, 0);
+ }
+
+ /**
+ * The timestamp of the instance when created or last modified.
+ *
+ * @return the timestamp of the instance.
+ */
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * The timestamp formatted as an ISO 8601 string with format of
+ * {@code YYYY-MM-DDTHH:mm:ss.SSSSSSZ}
+ *
+ * @return a formatted timestamp string.
+ */
+ public String getTimestampISO() {
+ return tsFormatter.format(timestamp);
+ }
+
+ /**
+ * Update a single property. If a property already exists it is overwritten.
+ * <p>
+ * It is much more efficient to add multiple properties at a time rather than one by one.
+ * <p>
+ * Because instances of this class are immutable, this method creates a new copy of the
+ * properties. Other processes will continue to see original values retrieved from the data store.
+ * Other processes will receive an update when the instance is encoded and stored in the data
+ * store and then retrieved with the normal store update mechanisms.
+ *
+ * @param key
+ * the property name.
+ * @param value
+ * the property value.
+ * @return A new instance of this class with the property added or updated.
+ */
+ public VersionedProperties addOrUpdate(final String key, final String value) {
+ var updated = new HashMap<>(props);
+ updated.put(key, value);
+ return new VersionedProperties(dataVersion, Instant.now(), updated);
+ }
+
+ /**
+ * Add or update multiple properties. If a property already exists it is overwritten.
+ * <p>
+ * Because instances of this class are immutable, this method creates a new copy of the
+ * properties. Other processes will continue to see original values retrieved from the data store.
+ * Other processes will receive an update when the instance is encoded and stored in the data
+ * store and then retrieved with the normal store update mechanisms.
+ *
+ * @param updates
+ * A map of key, values pairs.
+ * @return A new instance of this class with the properties added or updated.
+ */
+ public VersionedProperties addOrUpdate(final Map<String,String> updates) {
+ var updated = new HashMap<>(props);
+ updated.putAll(updates);
+ return new VersionedProperties(dataVersion, Instant.now(), updated);
+ }
+
+ /**
+ * Delete multiple properties provided as a collection of keys.
+ * <p>
+ * Because instances of this class are immutable, this method creates a new copy of the
+ * properties. Other processes will continue to see original values retrieved from the data store.
+ * Other processes will receive an update when the instance is encoded and stored in the data
+ * store and then retrieved with the normal store update mechanisms.
+ *
+ * @param keys
+ * a collection of the keys that if they exist, will be removed.
+ * @return A new instance of this class.
+ */
+ public VersionedProperties remove(Collection<String> keys) {
+ var updated = new HashMap<>(props);
+ updated.keySet().removeAll(keys);
+ return new VersionedProperties(dataVersion, Instant.now(), updated);
+ }
+
+ /**
+ * Generate a formatted string for debugging, either as a single line or human-friendly,
+ * multi-line format.
+ *
+ * @param prettyPrint
+ * if true, generate human-friendly string
+ * @return a formatted string
+ */
+ public String print(boolean prettyPrint) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("dataVersion=").append(dataVersion).append(prettyPrint ? "\n" : ", ");
+
+ sb.append("timeStamp=").append(tsFormatter.format(timestamp)).append(prettyPrint ? "\n" : ", ");
+
+ Map<String,String> sorted = new TreeMap<>(props);
+ sorted.forEach((k, v) -> {
+ if (prettyPrint) {
+ // indent if pretty
+ sb.append(" ");
+ }
+ sb.append(k).append("=").append(v);
+ sb.append(prettyPrint ? "\n" : ", ");
+ });
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return print(false);
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropCodecTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropCodecTest.java
new file mode 100644
index 0000000..6674dcf
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropCodecTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * Exercise the base class specific methods - most testing will occur in subclasses
+ */
+public class VersionedPropCodecTest {
+
+ @Test
+ public void invalidEncodingNullArray() {
+ assertThrows(IllegalArgumentException.class, () -> VersionedPropCodec.getEncodingVersion(null));
+ }
+
+ @Test
+ public void validEncoding() {
+ // length so that array reads do not error
+ byte[] bytes = new byte[100];
+ int encodingVersion = VersionedPropCodec.getEncodingVersion(bytes);
+ assertEquals(0, encodingVersion);
+ }
+
+ /**
+ * The timestamp will be invalid - this should cause a timestamp parse error that will be remapped
+ * to an IllegalArgumentException.
+ */
+ @Test
+ public void getDataVersionBadTimestamp() {
+ // length so that array reads do not error
+ byte[] bytes = new byte[100];
+ assertThrows(IllegalArgumentException.class, () -> VersionedPropCodec.getDataVersion(bytes));
+ }
+
+ @Test
+ public void goPath() throws IOException {
+ int aVersion = 13;
+ VersionedProperties vProps =
+ new VersionedProperties(aVersion, Instant.now(), Map.of("k1", "v1"));
+
+ VersionedPropCodec codec = VersionedPropGzipCodec.codec(true);
+ byte[] encodedBytes = codec.toBytes(vProps);
+
+ assertEquals(aVersion + 1, VersionedPropCodec.getDataVersion(encodedBytes));
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodec.java b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodec.java
new file mode 100644
index 0000000..db2605a
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodec.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.KeySpec;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * EXPERIMENTAL - demonstrates using an alternate encoding scheme. The sample is completely
+ * functional, however, certain elements such as password / key handling may not be suitable for
+ * production. The encoding version is EXPERIMENTAL_CIPHER_ENCODING_1_0.
+ * <p>
+ * This codec uses AES algorithm in GCM mode for encryption to encode the property map that is
+ * stored in the external store.
+ */
+public class VersionedPropEncryptCodec extends VersionedPropCodec {
+
+ // testing version (999 or higher)
+ public static final int EXPERIMENTAL_CIPHER_ENCODING_1_0 = 999;
+
+ public static final String CRYPT_ALGORITHM = "AES/GCM/NoPadding";
+
+ private final GCMCipherParams cipherParams;
+
+ private VersionedPropEncryptCodec(final EncodingOptions encodingOpts,
+ final GCMCipherParams cipherParams) {
+ super(encodingOpts);
+
+ this.cipherParams = cipherParams;
+ }
+
+ /**
+ * Instantiate a versioned property codec.
+ *
+ * @param compress
+ * if true, compress the payload
+ * @param cipherParams
+ * the parameters needed for AES GCM encryption.
+ * @return a codec for encoding / decoding versioned properties.
+ */
+ public static VersionedPropCodec codec(final boolean compress,
+ final GCMCipherParams cipherParams) {
+ return new VersionedPropEncryptCodec(
+ new EncodingOptions(EXPERIMENTAL_CIPHER_ENCODING_1_0, compress), cipherParams);
+ }
+
+ @Override
+ void encodePayload(final OutputStream out, final VersionedProperties vProps,
+ final EncodingOptions encodingOpts) throws IOException {
+
+ Cipher cipher;
+
+ try {
+ cipher = Cipher.getInstance(CRYPT_ALGORITHM);
+ cipher.init(Cipher.ENCRYPT_MODE, cipherParams.getSecretKey(),
+ cipherParams.getParameterSpec());
+ } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+ | InvalidAlgorithmParameterException ex) {
+ throw new IllegalStateException("Could not get cipher", ex);
+ }
+
+ try (DataOutputStream dos = new DataOutputStream(out)) {
+
+ // write codec specific metadata for decryption.
+ byte[] iv = cipherParams.parameterSpec.getIV();
+ dos.writeInt(iv.length);
+ dos.write(iv);
+
+ }
+
+ Map<String,String> props = vProps.getProperties();
+
+ // encode the property map to an internal byte array.
+ byte[] bytes;
+ if (encodingOpts.isCompressed()) {
+ try (ByteArrayOutputStream ba = new ByteArrayOutputStream();
+ GZIPOutputStream gzipOut = new GZIPOutputStream(ba);
+ DataOutputStream dos = new DataOutputStream(gzipOut)) {
+
+ writeMapAsUTF(dos, props);
+
+ // finalize the compression.
+ gzipOut.flush();
+ gzipOut.finish();
+
+ bytes = ba.toByteArray();
+ }
+
+ } else {
+ try (ByteArrayOutputStream ba = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(ba)) {
+ writeMapAsUTF(dos, props);
+ bytes = ba.toByteArray();
+ }
+ }
+
+ // encrypt the internal byte array and write to provided output stream
+ try (CipherOutputStream cos = new CipherOutputStream(out, cipher)) {
+ cos.write(bytes);
+ }
+
+ }
+
+ @Override
+ boolean checkCanDecodeVersion(EncodingOptions encodingOpts) {
+ return encodingOpts.getEncodingVersion() == EXPERIMENTAL_CIPHER_ENCODING_1_0;
+ }
+
+ /**
+ * Decodes the encryption specific metadata and then the map of properties. The encryption
+ * metadata is the initialization vector used to encrypt the properties. The use of a random
+ * initialization vector on encryption creates different encrypted values on each write even
+ * though the same key is being used.
+ *
+ * @param inStream
+ * an input stream
+ * @param encodingOpts
+ * the general encoding options.
+ * @return a map of property name, value pairs.
+ * @throws IOException
+ * if an error occurs reading from the input stream.
+ */
+ @Override
+ Map<String,String> decodePayload(InputStream inStream, EncodingOptions encodingOpts)
+ throws IOException {
+
+ Cipher cipher;
+
+ try (DataInputStream dis = new DataInputStream(inStream)) {
+
+ // read encryption specific metadata (initialization vector)
+ int ivLen = dis.readInt();
+ byte[] iv = new byte[ivLen];
+ int read = dis.read(iv, 0, ivLen);
+ if (read != ivLen) {
+ throw new IllegalStateException("Could not read data stream (reading iv array) expected "
+ + ivLen + ", received " + read);
+ }
+
+ // init cipher for decryption using initialization vector just read.
+ try {
+ cipher = Cipher.getInstance(CRYPT_ALGORITHM);
+ cipher.init(Cipher.DECRYPT_MODE, cipherParams.getSecretKey(),
+ GCMCipherParams.buildGCMParameterSpec(iv));
+ } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException
+ | InvalidAlgorithmParameterException ex) {
+ throw new IllegalStateException("Could not get cipher", ex);
+ }
+
+ if (encodingOpts.isCompressed()) {
+ try (CipherInputStream cis = new CipherInputStream(inStream, cipher);
+ GZIPInputStream gzipIn = new GZIPInputStream(cis);
+ DataInputStream cdis = new DataInputStream(gzipIn)) {
+ return readMapAsUTF(cdis);
+ }
+ } else {
+ // read the property map keys, values.
+ try (CipherInputStream cis = new CipherInputStream(inStream, cipher);
+ DataInputStream cdis = new DataInputStream(cis)) {
+ return readMapAsUTF(cdis);
+ }
+ }
+ }
+ }
+
+ public static class GCMCipherParams {
+
+ private final SecretKey secretKey;
+ private final GCMParameterSpec parameterSpec;
+
+ public GCMCipherParams(final char[] pass, final byte[] salt)
+ throws NoSuchAlgorithmException, InvalidKeySpecException {
+
+ SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
+ KeySpec spec = new PBEKeySpec(pass, salt, 65536, 256);
+ secretKey = new SecretKeySpec(factory.generateSecret(spec).getEncoded(), "AES");
+
+ parameterSpec = buildGCMParameterSpec();
+ }
+
+ // utils
+ public static GCMParameterSpec buildGCMParameterSpec() {
+ byte[] iv = new byte[16];
+ new SecureRandom().nextBytes(iv);
+ return new GCMParameterSpec(128, iv);
+ }
+
+ public static GCMParameterSpec buildGCMParameterSpec(byte[] iv) {
+ return new GCMParameterSpec(128, iv);
+ }
+
+ public SecretKey getSecretKey() {
+ return secretKey;
+ }
+
+ public GCMParameterSpec getParameterSpec() {
+ return parameterSpec;
+ }
+
+ }
+
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
new file mode 100644
index 0000000..63bd8d0
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.time.Instant;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+ private final Logger log = LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+ /**
+ * Perform a round trip - encode, decode set of operations.
+ *
+ * @throws Exception
+ * an exception is a test failure.
+ */
+ @Test
+ public void roundTripSample() throws Exception {
+
+ // set-up sample "secret" key - for testing only.
+ final char[] pass = {'a', 'b', 'c'};
+ final byte[] salt = {1, 2, 3};
+
+ var cipherProps = new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+ var cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), cipherProps.getParameterSpec());
+
+ byte[] payload;
+
+ try (var bos = new ByteArrayOutputStream()) {
+ var cos = new CipherOutputStream(bos, cipher);
+ var dos = new DataOutputStream(cos);
+
+ dos.writeUTF("A");
+ dos.writeUTF("B");
+ dos.writeUTF("C");
+
+ cos.close();
+
+ payload = bos.toByteArray();
+
+ log.debug("Output: {}", payload);
+
+ }
+
+ cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), cipherProps.getParameterSpec());
+
+ try (var bis = new ByteArrayInputStream(payload)) {
+ // write the property map keys, values.
+ try (var cis = new CipherInputStream(bis, cipher);
+ var cdatastream = new DataInputStream(cis)) {
+
+ assertEquals("A", cdatastream.readUTF());
+ assertEquals("B", cdatastream.readUTF());
+ assertEquals("C", cdatastream.readUTF());
+ }
+ }
+ }
+
+ /**
+ * Validate versioning with something other than default.
+ */
+ @Test
+ public void roundTripEncryption() throws Exception {
+
+ int aVersion = 13;
+
+ VersionedProperties vProps =
+ new VersionedProperties(aVersion, Instant.now(), Map.of("k1", "v1"));
+
+ final char[] pass = {'a', 'b', 'c'};
+ final byte[] salt = {1, 2, 3};
+
+ var encoder = VersionedPropEncryptCodec.codec(false,
+ new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+ byte[] encodedBytes = encoder.toBytes(vProps);
+
+ log.debug("Encoded: {}", encodedBytes);
+
+ VersionedProperties decodedProps = encoder.fromBytes(encodedBytes);
+
+ log.debug("Decoded: {}", decodedProps.print(true));
+
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+ // validate that the expected node version matches original version.
+ assertEquals(aVersion, vProps.getDataVersion());
+
+ // validate encoded version incremented.
+ assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+ assertEquals("encoded version should be 1 up", aVersion + 1, decodedProps.getDataVersion());
+ assertEquals("version written should be the source next version", vProps.getNextVersion(),
+ decodedProps.getDataVersion());
+ assertEquals("the next version in decoded should be +2", aVersion + 2,
+ decodedProps.getNextVersion());
+
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+ }
+
+ /**
+ * Validate versioning with something other than default.
+ */
+ @Test
+ public void roundTripEncryptionCompressed() throws Exception {
+
+ int aVersion = 13;
+ Instant now = Instant.now();
+
+ // compression friendly
+ // @formatter:off
+ Map<String, String> p
+ = Map.of("accumulo.prop.key_name.1", "value1", "accumulo.prop.key_name.2",
+ "value2", "accumulo.prop.key_name.3", "value3", "accumulo.prop.key_name.4", "value4",
+ "accumulo.prop.key_name.5", "value5", "accumulo.prop.key_name.6", "value9");
+ // @@formatter:on
+ VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+ final char[] pass = {'a', 'b', 'c'};
+ final byte[] salt = {1, 2, 3};
+
+ VersionedPropCodec encoder1 = VersionedPropEncryptCodec.codec(true,
+ new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+ byte[] encodedBytes = encoder1.toBytes(vProps);
+
+ log.debug("len: {}, bytes: {}", encodedBytes.length, encodedBytes);
+
+ VersionedProperties decodedProps = encoder1.fromBytes(encodedBytes);
+
+ log.debug("Decoded: {}", decodedProps.print(true));
+
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+ // validate that the expected node version matches original version.
+ assertEquals(aVersion, vProps.getDataVersion());
+
+ // validate encoded version incremented.
+ assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+ assertEquals("encoded version should be 1 up", aVersion + 1, decodedProps.getDataVersion());
+ assertEquals("version written should be the source next version", vProps.getNextVersion(),
+ decodedProps.getDataVersion());
+ assertEquals("the next version in decoded should be +2", aVersion + 2,
+ decodedProps.getNextVersion());
+
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+ }
+
+ @Test
+ public void validateEncryptedValuesChange() throws Exception {
+
+ int aVersion = 13;
+
+ VersionedProperties vProps =
+ new VersionedProperties(aVersion, Instant.now(), Map.of("k1", "v1"));
+
+ final char[] pass = {'a', 'b', 'c'};
+ final byte[] salt = {1, 2, 3};
+
+ VersionedPropCodec codec1 = VersionedPropEncryptCodec.codec(false,
+ new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+ byte[] encodedBytes1 = codec1.toBytes(vProps);
+
+ VersionedPropCodec codec2 = VersionedPropEncryptCodec.codec(false,
+ new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+ byte[] encodedBytes2 = codec2.toBytes(vProps);
+
+ log.debug("Encoded: {}", encodedBytes1);
+ log.debug("Encoded: {}", encodedBytes2);
+
+ VersionedProperties from2 = codec1.fromBytes(encodedBytes2);
+ VersionedProperties from1 = codec2.fromBytes(encodedBytes1);
+
+ assertEquals(from1.getProperties(), from2.getProperties());
+
+ VersionedPropCodec codec3 = VersionedPropEncryptCodec.codec(false,
+ new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+ VersionedProperties from3 = codec3.fromBytes(encodedBytes1);
+ assertEquals(from1.getDataVersion(), from3.getDataVersion());
+ assertEquals(from1.getProperties(), from3.getProperties());
+
+ assertNotEquals(encodedBytes1, encodedBytes2);
+
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodecTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodecTest.java
new file mode 100644
index 0000000..d8561c7
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodecTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Exercise the {@link VersionedPropGzipCodec} class.
+ */
+public class VersionedPropGzipCodecTest {
+
+ private static final Logger log = LoggerFactory.getLogger(VersionedPropGzipCodecTest.class);
+
+ @Test
+ public void roundTripUncompressed() throws IOException {
+ VersionedProperties vProps = new VersionedProperties(Map.of("k1", "v1"));
+
+ VersionedPropCodec encoder = VersionedPropGzipCodec.codec(false);
+
+ byte[] encodedMapBytes = encoder.toBytes(vProps);
+
+ VersionedProperties decodedProps = encoder.fromBytes(encodedMapBytes);
+
+ log.debug("Decoded: {}", decodedProps.getProperties());
+
+ // default - first write version should be 0
+ assertEquals("default - first write version should be 0", 0, decodedProps.getDataVersion());
+ assertEquals("default - first write next version should be 1", 1,
+ decodedProps.getNextVersion());
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+ }
+
+ @Test
+ public void roundTripCompressed() throws IOException {
+ VersionedProperties vProps = new VersionedProperties(Map.of("k1", "v1"));
+
+ VersionedPropCodec codec = VersionedPropGzipCodec.codec(true);
+
+ byte[] encodedMapBytes = codec.toBytes(vProps);
+
+ VersionedProperties decodedProps = codec.fromBytes(encodedMapBytes);
+
+ log.debug("Decoded: {}", decodedProps.getProperties());
+
+ assertEquals("default - first write version should be 0", 0, decodedProps.getDataVersion());
+ assertEquals("default - first write next version should be 1", 1,
+ decodedProps.getNextVersion());
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+ }
+
+ /**
+ * Validate versioning with something other than default.
+ */
+ @Test
+ public void roundTripVersioning() throws IOException {
+
+ int aVersion = 13;
+ VersionedProperties vProps =
+ new VersionedProperties(aVersion, Instant.now(), Map.of("k1", "v1"));
+
+ VersionedPropCodec codec = VersionedPropGzipCodec.codec(true);
+ byte[] encodedBytes = codec.toBytes(vProps);
+
+ VersionedProperties decodedProps = codec.fromBytes(encodedBytes);
+
+ log.trace("Decoded: {}", decodedProps.print(true));
+
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+ // validate that the expected node version matches original version.
+ assertEquals(aVersion, vProps.getDataVersion());
+
+ // validate encoded version incremented.
+ assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+ assertEquals("encoded version should be 1 up", aVersion + 1, decodedProps.getDataVersion());
+ assertEquals("version written should be the source next version", vProps.getNextVersion(),
+ decodedProps.getDataVersion());
+ assertEquals("the next version in decoded should be +2", aVersion + 2,
+ decodedProps.getNextVersion());
+
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+ }
+
+ @Test
+ public void roundTrip2() throws IOException {
+
+ int aVersion = 13;
+ VersionedProperties vProps =
+ new VersionedProperties(aVersion, Instant.now(), Map.of("k1", "v1"));
+
+ VersionedPropCodec codec = VersionedPropGzipCodec.codec(true);
+ byte[] encodedBytes = codec.toBytes(vProps);
+
+ VersionedProperties decodedProps = codec.fromBytes(encodedBytes);
+
+ log.debug("Decoded: {}", decodedProps.print(true));
+
+ assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropertiesTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropertiesTest.java
new file mode 100644
index 0000000..c93d63d
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropertiesTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropertiesTest {
+
+ private static final Logger log = LoggerFactory.getLogger(VersionedPropertiesTest.class);
+
+ @Test
+ public void initProperties() {
+ Map<String,String> initProps = new HashMap<>();
+ initProps.put("key1", "value1");
+ initProps.put("key2", "value2");
+ initProps.put("key3", "value3");
+ VersionedProperties vProps = new VersionedProperties(initProps);
+
+ Map<String,String> propMap = vProps.getProperties();
+
+ assertEquals(initProps.size(), propMap.size());
+
+ assertEquals("value1", propMap.get("key1"));
+ assertEquals("value2", propMap.get("key2"));
+ assertEquals("value3", propMap.get("key3"));
+
+ // invalid key
+ assertNull(propMap.get("key4"));
+
+ }
+
+ @Test
+ public void emptyProps() {
+ VersionedProperties vProps = new VersionedProperties();
+
+ assertNotNull(vProps);
+ assertEquals(0, vProps.getProperties().size());
+ assertNull(vProps.getProperties().get("key1"));
+ assertEquals(Collections.emptyMap(), vProps.getProperties());
+ }
+
+ @Test
+ public void nullProps() {
+ VersionedProperties vProps = new VersionedProperties(2, Instant.now(), null);
+ assertNotNull(vProps);
+ }
+
+ @Test
+ public void initialProps() {
+
+ Map<String,String> aMap = new HashMap<>();
+ aMap.put("k1", "v1");
+ aMap.put("k2", "v2");
+
+ VersionedProperties vProps = new VersionedProperties(aMap);
+
+ Map<String,String> rMap = vProps.getProperties();
+ assertEquals(aMap.size(), rMap.size());
+
+ assertThrows(UnsupportedOperationException.class, () -> rMap.put("k3", "v3"));
+
+ }
+
+ @Test
+ public void updateSingleProp() {
+
+ VersionedProperties vProps = new VersionedProperties();
+ vProps = vProps.addOrUpdate("k1", "v1");
+
+ assertEquals("v1", vProps.getProperties().get("k1"));
+ assertEquals(1, vProps.getProperties().size());
+
+ vProps = vProps.addOrUpdate("k1", "v1-2");
+
+ assertEquals("v1-2", vProps.getProperties().get("k1"));
+ }
+
+ @Test
+ public void updateProps() {
+
+ Map<String,String> aMap = new HashMap<>();
+ aMap.put("k1", "v1");
+ aMap.put("k2", "v2");
+
+ VersionedProperties vProps = new VersionedProperties(aMap);
+
+ assertEquals("v1", vProps.getProperties().get("k1"));
+ assertEquals(aMap.size(), vProps.getProperties().size());
+
+ Map<String,String> bMap = new HashMap<>();
+ bMap.put("k1", "v1-1");
+ bMap.put("k3", "v3");
+
+ VersionedProperties updated = vProps.addOrUpdate(bMap);
+
+ assertEquals(2, vProps.getProperties().size());
+ assertEquals(3, updated.getProperties().size());
+
+ assertEquals("v1-1", updated.getProperties().get("k1"));
+
+ }
+
+ @Test
+ public void removeProps() {
+
+ Map<String,String> aMap = new HashMap<>();
+ aMap.put("k1", "v1");
+ aMap.put("k2", "v2");
+
+ VersionedProperties vProps = new VersionedProperties(aMap);
+
+ assertEquals("v1", vProps.getProperties().get("k1"));
+ assertEquals(aMap.size(), vProps.getProperties().size());
+
+ // remove 1 existing and 1 not present
+ VersionedProperties vProps2 = vProps.remove(Arrays.asList("k1", "k3"));
+
+ assertEquals(1, vProps2.getProperties().size());
+ assertNull(vProps2.getProperties().get("k1"));
+ assertEquals("v2", vProps2.getProperties().get("k2"));
+ }
+
+ @Test
+ public void getInitialDataVersion() {
+ VersionedProperties vProps = new VersionedProperties();
+ assertEquals(0, vProps.getDataVersion());
+
+ // the initial version for write should be 0
+ assertEquals("Initial expected version should be 0", 0, vProps.getNextVersion());
+ assertTrue("timestamp should be now or earlier",
+ vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+ }
+
+ @Test
+ public void prettyTest() {
+ Map<String,String> aMap = new HashMap<>();
+ aMap.put("k1", "v1");
+ aMap.put("k2", "v2");
+
+ VersionedProperties vProps = new VersionedProperties(aMap);
+ assertFalse(vProps.toString().contains("\n"));
+ assertTrue(vProps.print(true).contains("\n"));
+ }
+
+ @Test
+ public void isoTimestamp() {
+ VersionedProperties vProps = new VersionedProperties();
+ log.trace("timestamp: {}", vProps.getTimestampISO());
+ assertTrue(vProps.getTimestampISO().endsWith("Z"));
+ }
+}