You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/20 02:15:28 UTC

[3/3] git commit: PHOENIX-1344 fix duplicit values in NTH_VALUE function (Vaclav Loffelmann)

PHOENIX-1344 fix duplicit values in NTH_VALUE function (Vaclav Loffelmann)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7835f41e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7835f41e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7835f41e

Branch: refs/heads/master
Commit: 7835f41efd74664f73bed866d40a958d43e0ace2
Parents: de67b9c
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 19 17:18:21 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:18:21 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/NthValueFunctionIT.java     | 150 +++++++++++++++++++
 .../FirstLastValueBaseClientAggregator.java     |  37 +++--
 .../FirstLastValueServerAggregator.java         |  46 ++++--
 .../util/FirstLastNthValueDataContainer.java    |  55 ++++---
 4 files changed, 244 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7835f41e/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
index 1cf2643..ec5bf96 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.util.ArrayList;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -152,4 +153,153 @@ public class NthValueFunctionIT extends BaseHBaseManagedTimeIT {
         assertFalse(rs.next());
     }
 
+    @Test
+    public void nonUniqueValuesInOrderByAsc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderByAscSkipDuplicit() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(3, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderByDesc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderNextValueDesc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 2) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderNextValueAsc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void ignoreNullValues() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG,"
+                + " dates BIGINT NOT NULL, \"value\" BIGINT NULL CONSTRAINT pk PRIMARY KEY (id, dates))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 1)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, NULL)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, NULL)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (5, 8, 4, 4)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (4, 8, 5, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY dates DESC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 4);
+        assertFalse(rs.next());
+    }
+
+    private void assertInIntArray(int[] should, int actualValue) {
+        ArrayList<Integer> shouldList = new ArrayList<Integer>();
+        for (int i: should) {
+            shouldList.add(i);
+        }
+        assertTrue(shouldList.contains(actualValue));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7835f41e/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
index fde79ba..6dfca39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.expression.aggregator;
 
+import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -42,7 +44,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
     protected int offset = -1;
     protected BinaryComparator topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
     protected byte[] topValue = null;
-    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new ByteArrayComparator());
+    protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new ByteArrayComparator());
     protected boolean isAscending;
 
     public FirstLastValueBaseClientAggregator() {
@@ -63,7 +65,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
                 return false;
             }
 
-            Set<Map.Entry<byte[], byte[]>> entrySet;
+            Set<Map.Entry<byte[], LinkedList<byte[]>>> entrySet;
             if (isAscending) {
                 entrySet = topValues.entrySet();
             } else {
@@ -71,10 +73,14 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
             }
 
             int counter = offset;
-            for (Map.Entry<byte[], byte[]> entry : entrySet) {
-                if (--counter == 0) {
-                    ptr.set(entry.getValue());
-                    return true;
+            for (Map.Entry<byte[], LinkedList<byte[]>> entry : entrySet) {
+                ListIterator<byte[]> it = entry.getValue().listIterator();
+                while (it.hasNext()) {
+                    if (--counter == 0) {
+                        ptr.set(it.next());
+                        return true;
+                    }
+                    it.next();
                 }
             }
 
@@ -103,13 +109,22 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
 
         payload.setPayload(ptr.copyBytes());
         isAscending = payload.getIsAscending();
-        TreeMap serverAggregatorResult = payload.getData();
+        TreeMap<byte[], LinkedList<byte[]>> serverAggregatorResult = payload.getData();
 
         if (useOffset) {
-            payload.setOffset(offset);
-            topValues.putAll(serverAggregatorResult);
+            //merge topValues
+            for (Entry<byte[], LinkedList<byte[]>> entry : serverAggregatorResult.entrySet()) {
+                byte[] itemKey = entry.getKey();
+                LinkedList<byte[]> itemList = entry.getValue();
+
+                if (topValues.containsKey(itemKey)) {
+                    topValues.get(itemKey).addAll(itemList);
+                } else {
+                    topValues.put(itemKey, itemList);
+                }
+            }
         } else {
-            Entry<byte[], byte[]> valueEntry = serverAggregatorResult.firstEntry();
+            Entry<byte[], LinkedList<byte[]>> valueEntry = serverAggregatorResult.firstEntry();
             byte[] currentOrder = valueEntry.getKey();
 
             boolean isBetter;
@@ -120,7 +135,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
             }
             if (topOrder.getValue().length < 1 || isBetter) {
                 topOrder = new BinaryComparator(currentOrder);
-                topValue = valueEntry.getValue();
+                topValue = valueEntry.getValue().getFirst();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7835f41e/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
index 90b7826..5e51e07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeMap;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,11 +46,12 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
     protected byte[] topValue;
     protected boolean useOffset = false;
     protected int offset = -1;
-    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+    protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
     protected boolean isAscending;
     protected boolean hasValueDescSortOrder;
     protected Expression orderByColumn;
     protected Expression dataColumn;
+    protected int topValuesCount = 0;
 
     public FirstLastValueServerAggregator() {
         super(SortOrder.getDefault());
@@ -60,6 +62,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
         topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
         topValue = null;
         topValues.clear();
+        topValuesCount = 0;
         offset = -1;
         useOffset = false;
     }
@@ -81,7 +84,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
 
         if (useOffset) {
             boolean addFlag = false;
-            if (topValues.size() < offset) {
+            if (topValuesCount < offset) {
                 try {
                     addFlag = true;
                 } catch (Exception e) {
@@ -89,25 +92,27 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
                 }
             } else {
                 if (isAscending) {
-                    byte[] lowestKey = topValues.lastKey();
-                    if (Bytes.compareTo(currentOrder, lowestKey) < 0) {
-                        topValues.remove(lowestKey);
+                    if (removeLastElement(currentOrder, topValues.lastKey(), -1)) {
                         addFlag = true;
+                        topValuesCount--;
                     }
-                } else { //desc
-                    byte[] highestKey = topValues.firstKey();
-                    if (Bytes.compareTo(currentOrder, highestKey) > 0) {
-                        topValues.remove(highestKey);
+                } else {
+                    if (removeLastElement(currentOrder, topValues.firstKey(), 1)) {
                         addFlag = true;
+                        topValuesCount--;
                     }
                 }
             }
             if (addFlag) {
+                topValuesCount++;
+                if (!topValues.containsKey(currentOrder)) {
+                    topValues.put(currentOrder, new LinkedList<byte[]>());
+                }
                 //invert bytes if is SortOrder set
                 if (hasValueDescSortOrder) {
-                    topValues.put(currentOrder, SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
+                    topValues.get(currentOrder).push(SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
                 } else {
-                    topValues.put(currentOrder, ptr.copyBytes());
+                    topValues.get(currentOrder).push(ptr.copyBytes());
                 }
             }
         } else {
@@ -158,14 +163,17 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
         if (useOffset) {
             payload.setOffset(offset);
 
-            if (topValues.size() == 0) {
+            if (topValuesCount == 0) {
                 return false;
             }
         } else {
             if (topValue == null) {
                 return false;
             }
-            topValues.put(topOrder.getValue(), topValue);
+
+            LinkedList<byte[]> topValueList = new LinkedList<byte[]>();
+            topValueList.push(topValue);
+            topValues.put(topOrder.getValue(), topValueList);
         }
         payload.setData(topValues);
 
@@ -202,4 +210,16 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
             this.isAscending = isAscending;
         }
     }
+
+    private boolean removeLastElement(byte[] currentOrder, byte[] lowestKey, int sortOrderInt) {
+        if (Bytes.compareTo(currentOrder, lowestKey) * sortOrderInt >= 0) {
+            if (topValues.get(lowestKey).size() == 1) {
+                topValues.remove(lowestKey);
+            } else {
+                topValues.get(lowestKey).pollFirst();
+            }
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7835f41e/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
index 562f189..b358dce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.util;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -32,7 +34,7 @@ public class FirstLastNthValueDataContainer {
 
     protected boolean isAscending = false;
     protected int offset;
-    protected TreeMap<byte[], byte[]> data;
+    protected TreeMap<byte[], LinkedList<byte[]>> data;
     protected boolean isOrderValuesFixedLength = false;
     protected boolean isDataValuesFixedLength = false;
 
@@ -40,7 +42,7 @@ public class FirstLastNthValueDataContainer {
         isAscending = ascending;
     }
 
-    public void setData(TreeMap<byte[], byte[]> topValues) {
+    public void setData(TreeMap<byte[], LinkedList<byte[]>> topValues) {
         data = topValues;
     }
 
@@ -65,7 +67,7 @@ public class FirstLastNthValueDataContainer {
         int lengthOfDataValues = Bytes.toInt(payload, 5);
         int sizeOfMap = Bytes.toInt(payload, 9);
 
-        data = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+        data = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
 
         int payloadOffset = 13;
 
@@ -93,7 +95,10 @@ public class FirstLastNthValueDataContainer {
                 payloadOffset += l;
             }
 
-            data.put(key, value);
+            if(!data.containsKey(key)) {
+                data.put(key, new LinkedList<byte[]>());
+            }
+            data.get(key).add(value);
         }
 
     }
@@ -127,7 +132,7 @@ public class FirstLastNthValueDataContainer {
 
         bos.write(isAscending ? (byte) 1 : (byte) 0);
 
-        Entry<byte[], byte[]> firstEntry = data.firstEntry();
+        Entry<byte[], LinkedList<byte[]>> firstEntry = data.firstEntry();
         if (isOrderValuesFixedLength) {
             bos.write(Bytes.toBytes(firstEntry.getKey().length));
         } else {
@@ -135,34 +140,44 @@ public class FirstLastNthValueDataContainer {
         }
 
         if (isDataValuesFixedLength) {
-            bos.write(Bytes.toBytes(firstEntry.getValue().length));
+            bos.write(Bytes.toBytes(firstEntry.getValue().getFirst().length));
         } else {
             bos.write(Bytes.toBytes(0));
         }
 
-        bos.write(Bytes.toBytes(data.size()));
-
-        for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
-
-            if (!isOrderValuesFixedLength) {
-                bos.write(Bytes.toBytes(entry.getKey().length));
-            }
-            bos.write(entry.getKey());
-
-            if (!isDataValuesFixedLength) {
-                bos.write(Bytes.toBytes(entry.getValue().length));
+        int offsetForDataLength = bos.size();
+        bos.write(new byte[4]); //space for number of elements
+        int valuesCount = 0;
+
+        for (Map.Entry<byte[], LinkedList<byte[]>> entry : data.entrySet()) {
+            ListIterator<byte[]> it = entry.getValue().listIterator();
+            while(it.hasNext()) {
+                valuesCount++;
+                byte[] itemValue = it.next();
+
+                if (!isOrderValuesFixedLength) {
+                    bos.write(Bytes.toBytes(entry.getKey().length));
+                }
+                bos.write(entry.getKey());
+
+                if (!isDataValuesFixedLength) {
+                    bos.write(Bytes.toBytes(itemValue.length));
+                }
+                bos.write(itemValue);
             }
-            bos.write(entry.getValue());
         }
 
-        return bos.toByteArray();
+        byte[] outputArray = bos.toByteArray();
+        //write number of elements
+        System.arraycopy(Bytes.toBytes(valuesCount), 0, outputArray, offsetForDataLength, 4);
+        return outputArray;
     }
 
     public boolean getIsAscending() {
         return isAscending;
     }
 
-    public TreeMap getData() {
+    public TreeMap<byte[], LinkedList<byte[]>> getData() {
         return data;
     }
 }