You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [10/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,287 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class batch_mutation_super_t implements TBase, java.io.Serializable {
+  public String table;
+  public String key;
+  public Map<String,List<superColumn_t>> cfmap;
+  public Map<String,List<superColumn_t>> cfmapdel;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean table = false;
+    public boolean key = false;
+    public boolean cfmap = false;
+    public boolean cfmapdel = false;
+  }
+
+  public batch_mutation_super_t() {
+  }
+
+  public batch_mutation_super_t(
+    String table,
+    String key,
+    Map<String,List<superColumn_t>> cfmap,
+    Map<String,List<superColumn_t>> cfmapdel)
+  {
+    this();
+    this.table = table;
+    this.__isset.table = true;
+    this.key = key;
+    this.__isset.key = true;
+    this.cfmap = cfmap;
+    this.__isset.cfmap = true;
+    this.cfmapdel = cfmapdel;
+    this.__isset.cfmapdel = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof batch_mutation_super_t)
+      return this.equals((batch_mutation_super_t)that);
+    return false;
+  }
+
+  public boolean equals(batch_mutation_super_t that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_table = true && (this.table != null);
+    boolean that_present_table = true && (that.table != null);
+    if (this_present_table || that_present_table) {
+      if (!(this_present_table && that_present_table))
+        return false;
+      if (!this.table.equals(that.table))
+        return false;
+    }
+
+    boolean this_present_key = true && (this.key != null);
+    boolean that_present_key = true && (that.key != null);
+    if (this_present_key || that_present_key) {
+      if (!(this_present_key && that_present_key))
+        return false;
+      if (!this.key.equals(that.key))
+        return false;
+    }
+
+    boolean this_present_cfmap = true && (this.cfmap != null);
+    boolean that_present_cfmap = true && (that.cfmap != null);
+    if (this_present_cfmap || that_present_cfmap) {
+      if (!(this_present_cfmap && that_present_cfmap))
+        return false;
+      if (!this.cfmap.equals(that.cfmap))
+        return false;
+    }
+
+    boolean this_present_cfmapdel = true && (this.cfmapdel != null);
+    boolean that_present_cfmapdel = true && (that.cfmapdel != null);
+    if (this_present_cfmapdel || that_present_cfmapdel) {
+      if (!(this_present_cfmapdel && that_present_cfmapdel))
+        return false;
+      if (!this.cfmapdel.equals(that.cfmapdel))
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.STRING) {
+            this.table = iprot.readString();
+            this.__isset.table = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2:
+          if (field.type == TType.STRING) {
+            this.key = iprot.readString();
+            this.__isset.key = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3:
+          if (field.type == TType.MAP) {
+            {
+              TMap _map22 = iprot.readMapBegin();
+              this.cfmap = new HashMap<String,List<superColumn_t>>(2*_map22.size);
+              for (int _i23 = 0; _i23 < _map22.size; ++_i23)
+              {
+                String _key24;
+                List<superColumn_t> _val25;
+                _key24 = iprot.readString();
+                {
+                  TList _list26 = iprot.readListBegin();
+                  _val25 = new ArrayList<superColumn_t>(_list26.size);
+                  for (int _i27 = 0; _i27 < _list26.size; ++_i27)
+                  {
+                    superColumn_t _elem28 = new superColumn_t();
+                    _elem28 = new superColumn_t();
+                    _elem28.read(iprot);
+                    _val25.add(_elem28);
+                  }
+                  iprot.readListEnd();
+                }
+                this.cfmap.put(_key24, _val25);
+              }
+              iprot.readMapEnd();
+            }
+            this.__isset.cfmap = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 4:
+          if (field.type == TType.MAP) {
+            {
+              TMap _map29 = iprot.readMapBegin();
+              this.cfmapdel = new HashMap<String,List<superColumn_t>>(2*_map29.size);
+              for (int _i30 = 0; _i30 < _map29.size; ++_i30)
+              {
+                String _key31;
+                List<superColumn_t> _val32;
+                _key31 = iprot.readString();
+                {
+                  TList _list33 = iprot.readListBegin();
+                  _val32 = new ArrayList<superColumn_t>(_list33.size);
+                  for (int _i34 = 0; _i34 < _list33.size; ++_i34)
+                  {
+                    superColumn_t _elem35 = new superColumn_t();
+                    _elem35 = new superColumn_t();
+                    _elem35.read(iprot);
+                    _val32.add(_elem35);
+                  }
+                  iprot.readListEnd();
+                }
+                this.cfmapdel.put(_key31, _val32);
+              }
+              iprot.readMapEnd();
+            }
+            this.__isset.cfmapdel = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("batch_mutation_super_t");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    if (this.table != null) {
+      field.name = "table";
+      field.type = TType.STRING;
+      field.id = 1;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.table);
+      oprot.writeFieldEnd();
+    }
+    if (this.key != null) {
+      field.name = "key";
+      field.type = TType.STRING;
+      field.id = 2;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.key);
+      oprot.writeFieldEnd();
+    }
+    if (this.cfmap != null) {
+      field.name = "cfmap";
+      field.type = TType.MAP;
+      field.id = 3;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmap.size()));
+        for (String _iter36 : this.cfmap.keySet())        {
+          oprot.writeString(_iter36);
+          {
+            oprot.writeListBegin(new TList(TType.STRUCT, this.cfmap.get(_iter36).size()));
+            for (superColumn_t _iter37 : this.cfmap.get(_iter36))            {
+              _iter37.write(oprot);
+            }
+            oprot.writeListEnd();
+          }
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    if (this.cfmapdel != null) {
+      field.name = "cfmapdel";
+      field.type = TType.MAP;
+      field.id = 4;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmapdel.size()));
+        for (String _iter38 : this.cfmapdel.keySet())        {
+          oprot.writeString(_iter38);
+          {
+            oprot.writeListBegin(new TList(TType.STRUCT, this.cfmapdel.get(_iter38).size()));
+            for (superColumn_t _iter39 : this.cfmapdel.get(_iter38))            {
+              _iter39.write(oprot);
+            }
+            oprot.writeListEnd();
+          }
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("batch_mutation_super_t(");
+    sb.append("table:");
+    sb.append(this.table);
+    sb.append(",key:");
+    sb.append(this.key);
+    sb.append(",cfmap:");
+    sb.append(this.cfmap);
+    sb.append(",cfmapdel:");
+    sb.append(this.cfmapdel);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,287 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class batch_mutation_t implements TBase, java.io.Serializable {
+  public String table;
+  public String key;
+  public Map<String,List<column_t>> cfmap;
+  public Map<String,List<column_t>> cfmapdel;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean table = false;
+    public boolean key = false;
+    public boolean cfmap = false;
+    public boolean cfmapdel = false;
+  }
+
+  public batch_mutation_t() {
+  }
+
+  public batch_mutation_t(
+    String table,
+    String key,
+    Map<String,List<column_t>> cfmap,
+    Map<String,List<column_t>> cfmapdel)
+  {
+    this();
+    this.table = table;
+    this.__isset.table = true;
+    this.key = key;
+    this.__isset.key = true;
+    this.cfmap = cfmap;
+    this.__isset.cfmap = true;
+    this.cfmapdel = cfmapdel;
+    this.__isset.cfmapdel = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof batch_mutation_t)
+      return this.equals((batch_mutation_t)that);
+    return false;
+  }
+
+  public boolean equals(batch_mutation_t that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_table = true && (this.table != null);
+    boolean that_present_table = true && (that.table != null);
+    if (this_present_table || that_present_table) {
+      if (!(this_present_table && that_present_table))
+        return false;
+      if (!this.table.equals(that.table))
+        return false;
+    }
+
+    boolean this_present_key = true && (this.key != null);
+    boolean that_present_key = true && (that.key != null);
+    if (this_present_key || that_present_key) {
+      if (!(this_present_key && that_present_key))
+        return false;
+      if (!this.key.equals(that.key))
+        return false;
+    }
+
+    boolean this_present_cfmap = true && (this.cfmap != null);
+    boolean that_present_cfmap = true && (that.cfmap != null);
+    if (this_present_cfmap || that_present_cfmap) {
+      if (!(this_present_cfmap && that_present_cfmap))
+        return false;
+      if (!this.cfmap.equals(that.cfmap))
+        return false;
+    }
+
+    boolean this_present_cfmapdel = true && (this.cfmapdel != null);
+    boolean that_present_cfmapdel = true && (that.cfmapdel != null);
+    if (this_present_cfmapdel || that_present_cfmapdel) {
+      if (!(this_present_cfmapdel && that_present_cfmapdel))
+        return false;
+      if (!this.cfmapdel.equals(that.cfmapdel))
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.STRING) {
+            this.table = iprot.readString();
+            this.__isset.table = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2:
+          if (field.type == TType.STRING) {
+            this.key = iprot.readString();
+            this.__isset.key = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3:
+          if (field.type == TType.MAP) {
+            {
+              TMap _map0 = iprot.readMapBegin();
+              this.cfmap = new HashMap<String,List<column_t>>(2*_map0.size);
+              for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+              {
+                String _key2;
+                List<column_t> _val3;
+                _key2 = iprot.readString();
+                {
+                  TList _list4 = iprot.readListBegin();
+                  _val3 = new ArrayList<column_t>(_list4.size);
+                  for (int _i5 = 0; _i5 < _list4.size; ++_i5)
+                  {
+                    column_t _elem6 = new column_t();
+                    _elem6 = new column_t();
+                    _elem6.read(iprot);
+                    _val3.add(_elem6);
+                  }
+                  iprot.readListEnd();
+                }
+                this.cfmap.put(_key2, _val3);
+              }
+              iprot.readMapEnd();
+            }
+            this.__isset.cfmap = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 4:
+          if (field.type == TType.MAP) {
+            {
+              TMap _map7 = iprot.readMapBegin();
+              this.cfmapdel = new HashMap<String,List<column_t>>(2*_map7.size);
+              for (int _i8 = 0; _i8 < _map7.size; ++_i8)
+              {
+                String _key9;
+                List<column_t> _val10;
+                _key9 = iprot.readString();
+                {
+                  TList _list11 = iprot.readListBegin();
+                  _val10 = new ArrayList<column_t>(_list11.size);
+                  for (int _i12 = 0; _i12 < _list11.size; ++_i12)
+                  {
+                    column_t _elem13 = new column_t();
+                    _elem13 = new column_t();
+                    _elem13.read(iprot);
+                    _val10.add(_elem13);
+                  }
+                  iprot.readListEnd();
+                }
+                this.cfmapdel.put(_key9, _val10);
+              }
+              iprot.readMapEnd();
+            }
+            this.__isset.cfmapdel = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("batch_mutation_t");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    if (this.table != null) {
+      field.name = "table";
+      field.type = TType.STRING;
+      field.id = 1;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.table);
+      oprot.writeFieldEnd();
+    }
+    if (this.key != null) {
+      field.name = "key";
+      field.type = TType.STRING;
+      field.id = 2;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.key);
+      oprot.writeFieldEnd();
+    }
+    if (this.cfmap != null) {
+      field.name = "cfmap";
+      field.type = TType.MAP;
+      field.id = 3;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmap.size()));
+        for (String _iter14 : this.cfmap.keySet())        {
+          oprot.writeString(_iter14);
+          {
+            oprot.writeListBegin(new TList(TType.STRUCT, this.cfmap.get(_iter14).size()));
+            for (column_t _iter15 : this.cfmap.get(_iter14))            {
+              _iter15.write(oprot);
+            }
+            oprot.writeListEnd();
+          }
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    if (this.cfmapdel != null) {
+      field.name = "cfmapdel";
+      field.type = TType.MAP;
+      field.id = 4;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmapdel.size()));
+        for (String _iter16 : this.cfmapdel.keySet())        {
+          oprot.writeString(_iter16);
+          {
+            oprot.writeListBegin(new TList(TType.STRUCT, this.cfmapdel.get(_iter16).size()));
+            for (column_t _iter17 : this.cfmapdel.get(_iter16))            {
+              _iter17.write(oprot);
+            }
+            oprot.writeListEnd();
+          }
+        }
+        oprot.writeMapEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("batch_mutation_t(");
+    sb.append("table:");
+    sb.append(this.table);
+    sb.append(",key:");
+    sb.append(this.key);
+    sb.append(",cfmap:");
+    sb.append(this.cfmap);
+    sb.append(",cfmapdel:");
+    sb.append(this.cfmapdel);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/service/column_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/column_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/column_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/column_t.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,181 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class column_t implements TBase, java.io.Serializable {
+  public String columnName;
+  public String value;
+  public long timestamp;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean columnName = false;
+    public boolean value = false;
+    public boolean timestamp = false;
+  }
+
+  public column_t() {
+  }
+
+  public column_t(
+    String columnName,
+    String value,
+    long timestamp)
+  {
+    this();
+    this.columnName = columnName;
+    this.__isset.columnName = true;
+    this.value = value;
+    this.__isset.value = true;
+    this.timestamp = timestamp;
+    this.__isset.timestamp = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof column_t)
+      return this.equals((column_t)that);
+    return false;
+  }
+
+  public boolean equals(column_t that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_columnName = true && (this.columnName != null);
+    boolean that_present_columnName = true && (that.columnName != null);
+    if (this_present_columnName || that_present_columnName) {
+      if (!(this_present_columnName && that_present_columnName))
+        return false;
+      if (!this.columnName.equals(that.columnName))
+        return false;
+    }
+
+    boolean this_present_value = true && (this.value != null);
+    boolean that_present_value = true && (that.value != null);
+    if (this_present_value || that_present_value) {
+      if (!(this_present_value && that_present_value))
+        return false;
+      if (!this.value.equals(that.value))
+        return false;
+    }
+
+    boolean this_present_timestamp = true;
+    boolean that_present_timestamp = true;
+    if (this_present_timestamp || that_present_timestamp) {
+      if (!(this_present_timestamp && that_present_timestamp))
+        return false;
+      if (this.timestamp != that.timestamp)
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.STRING) {
+            this.columnName = iprot.readString();
+            this.__isset.columnName = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2:
+          if (field.type == TType.STRING) {
+            this.value = iprot.readString();
+            this.__isset.value = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3:
+          if (field.type == TType.I64) {
+            this.timestamp = iprot.readI64();
+            this.__isset.timestamp = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("column_t");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    if (this.columnName != null) {
+      field.name = "columnName";
+      field.type = TType.STRING;
+      field.id = 1;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.columnName);
+      oprot.writeFieldEnd();
+    }
+    if (this.value != null) {
+      field.name = "value";
+      field.type = TType.STRING;
+      field.id = 2;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.value);
+      oprot.writeFieldEnd();
+    }
+    field.name = "timestamp";
+    field.type = TType.I64;
+    field.id = 3;
+    oprot.writeFieldBegin(field);
+    oprot.writeI64(this.timestamp);
+    oprot.writeFieldEnd();
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("column_t(");
+    sb.append("columnName:");
+    sb.append(this.columnName);
+    sb.append(",value:");
+    sb.append(this.value);
+    sb.append(",timestamp:");
+    sb.append(this.timestamp);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,168 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class superColumn_t implements TBase, java.io.Serializable {
+  public String name;
+  public List<column_t> columns;
+
+  public final Isset __isset = new Isset();
+  public static final class Isset implements java.io.Serializable {
+    public boolean name = false;
+    public boolean columns = false;
+  }
+
+  public superColumn_t() {
+  }
+
+  public superColumn_t(
+    String name,
+    List<column_t> columns)
+  {
+    this();
+    this.name = name;
+    this.__isset.name = true;
+    this.columns = columns;
+    this.__isset.columns = true;
+  }
+
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof superColumn_t)
+      return this.equals((superColumn_t)that);
+    return false;
+  }
+
+  public boolean equals(superColumn_t that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_name = true && (this.name != null);
+    boolean that_present_name = true && (that.name != null);
+    if (this_present_name || that_present_name) {
+      if (!(this_present_name && that_present_name))
+        return false;
+      if (!this.name.equals(that.name))
+        return false;
+    }
+
+    boolean this_present_columns = true && (this.columns != null);
+    boolean that_present_columns = true && (that.columns != null);
+    if (this_present_columns || that_present_columns) {
+      if (!(this_present_columns && that_present_columns))
+        return false;
+      if (!this.columns.equals(that.columns))
+        return false;
+    }
+
+    return true;
+  }
+
+  public int hashCode() {
+    return 0;
+  }
+
+  public void read(TProtocol iprot) throws TException {
+    TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == TType.STOP) { 
+        break;
+      }
+      switch (field.id)
+      {
+        case 1:
+          if (field.type == TType.STRING) {
+            this.name = iprot.readString();
+            this.__isset.name = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2:
+          if (field.type == TType.LIST) {
+            {
+              TList _list18 = iprot.readListBegin();
+              this.columns = new ArrayList<column_t>(_list18.size);
+              for (int _i19 = 0; _i19 < _list18.size; ++_i19)
+              {
+                column_t _elem20 = new column_t();
+                _elem20 = new column_t();
+                _elem20.read(iprot);
+                this.columns.add(_elem20);
+              }
+              iprot.readListEnd();
+            }
+            this.__isset.columns = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          TProtocolUtil.skip(iprot, field.type);
+          break;
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+  }
+
+  public void write(TProtocol oprot) throws TException {
+    TStruct struct = new TStruct("superColumn_t");
+    oprot.writeStructBegin(struct);
+    TField field = new TField();
+    if (this.name != null) {
+      field.name = "name";
+      field.type = TType.STRING;
+      field.id = 1;
+      oprot.writeFieldBegin(field);
+      oprot.writeString(this.name);
+      oprot.writeFieldEnd();
+    }
+    if (this.columns != null) {
+      field.name = "columns";
+      field.type = TType.LIST;
+      field.id = 2;
+      oprot.writeFieldBegin(field);
+      {
+        oprot.writeListBegin(new TList(TType.STRUCT, this.columns.size()));
+        for (column_t _iter21 : this.columns)        {
+          _iter21.write(oprot);
+        }
+        oprot.writeListEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("superColumn_t(");
+    sb.append("name:");
+    sb.append(this.name);
+    sb.append(",columns:");
+    sb.append(this.columns);
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/DBTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/DBTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/DBTest.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Scanner;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.mapreduce.SequentialScanner;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+
+
+public class DBTest
+{
+    private static void doWrites() throws Throwable
+    {         
+        for ( int i = 0; i < 512*1024; ++i )
+        {
+            String key = Integer.toString(i);
+            RowMutation rm = new RowMutation("Mailbox", key);
+            String value = "Data for key " + key;
+            rm.add("Test:" + "Column", value.getBytes(), i);                
+            rm.apply();
+        }
+        System.out.println("Write done");
+    }
+    
+    private static void doReads() throws Throwable
+    {
+        Table table = Table.open("Mailbox");
+        for ( int i = 100; i < 1000; ++i )
+        {        
+            String key = Integer.toString(i);
+            Row row = table.getRow(key, "Test");
+            System.out.println( row.getColumnFamily("Test") );
+            System.out.println("Row read done");            
+            ColumnFamily cf = table.get(key, "Test");                                  
+            if (cf == null)
+                System.out.println("KEY " + key + " is missing");
+            else
+            {
+                Collection<IColumn> superColumns = cf.getAllColumns();                
+                System.out.println("Success ...");
+            }
+        }
+        System.out.println("Read done ...");  
+    }
+    
+    private static void doRead(String key) throws Throwable
+    {
+        Table table = Table.open("Mailbox");
+        Row row = table.getRow(key, "Test");    
+        ColumnFamily cf = table.get(key, "Test");                                  
+        if (cf == null)
+            System.out.println("KEY " + key + " is missing");
+        else
+        {
+            Collection<IColumn> columns = cf.getAllColumns();                
+            for ( IColumn column : columns )
+            {
+                System.out.println(column.name());
+                System.out.println( new String( column.value() ) );
+            }
+        }
+        System.out.println("Read done ...");
+    }
+    
+    private static void doScannerTest() throws Throwable
+    {
+        Scanner scanner = new Scanner("Mailbox");
+        scanner.fetch(Integer.toString(105), "MailboxMailList0");
+        
+        while ( scanner.hasNext() )
+        {
+            System.out.println(scanner.next().name());
+        }             
+    }
+    
+    private static void doSequentialScannerTest() throws Throwable
+    {
+        SequentialScanner scanner = new SequentialScanner("Mailbox");
+        while ( scanner.hasNext() )
+        {
+            Row row = scanner.next();  
+            System.out.println( row.getColumnFamily("Test") );
+            System.out.println( row.getColumnFamily("Test2") );
+        }
+    }
+    
+    public static void doTest()
+    {
+        String host = "insearch00";
+        String host2 = "insearch0";
+        Set<EndPoint> allNodes = new HashSet<EndPoint>();
+        for ( int i = 1; i <= 3; ++i )
+        {
+            if ( i < 10 )
+                allNodes.add( new EndPoint(host + i + ".sf2p.facebook.com", 7000) );
+            else
+                allNodes.add( new EndPoint(host2 + i + ".sf2p.facebook.com", 7000) );
+        }
+        
+        for ( int i = 1; i <= 2; ++i )
+        {
+            if ( i < 10 )
+                allNodes.add( new EndPoint(host + i + ".ash1.facebook.com", 7000) );
+            else
+                allNodes.add( new EndPoint(host2 + i + ".ash1.facebook.com", 7000) );
+        }
+        
+        TestChoice t = new TestChoice(allNodes);
+        t.assignReplicas();
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        /*
+        SSTable ssTable = new SSTable("C:\\Engagements\\", "Sample-Bf");
+        BloomFilter bf = new BloomFilter(512*1024, 15);
+        for ( int i = 0; i < 512*1024; ++i )
+        {
+            bf.fill( Integer.toString(i) );
+        }        
+        ssTable.close(bf);
+        */
+        /*
+        IFileWriter writer = SequenceFile.bufferedWriter("C:\\Engagements\\Sample-Bf-Data.db", 4*1024*1024);
+        BloomFilter bf = new BloomFilter(512*1024, 15);
+        for ( int i = 0; i < 512*1024; ++i )
+        {
+            bf.fill( Integer.toString(i) );
+        }
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        BloomFilter.serializer().serialize(bf, bufOut);
+        bufOut.close();
+        writer.close(bufOut.getData(), bufOut.getLength());
+        writer.close();        
+        
+        IFileReader reader = SequenceFile.bufferedReader("C:\\Engagements\\Sample-Bf-Data.db", 4*1024*1024);
+        //DataOutputBuffer bufOut = new DataOutputBuffer();
+        bufOut.reset();
+        reader.next(bufOut);
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bufOut.getData(), bufOut.getLength());
+        bufIn.readUTF();
+        bufIn.readInt();
+        BloomFilter bf2 = BloomFilter.serializer().deserialize(bufIn);
+        int count = 0;
+        for ( int i = 0; i < 512*1024; ++i )
+        {
+            if ( !bf2.isPresent(Integer.toString(i)) )
+                ++count;
+        }
+        System.out.println(count);
+        reader.close();
+        */
+        //LogUtil.init();
+        //StorageService.instance().start(); 
+        //doWrites();
+        //doRead("543");
+        
+        DatabaseDescriptor.init();
+        DBTest.doTest();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,1631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.THttpClient;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.batch_mutation_super_t;
+import org.apache.cassandra.service.batch_mutation_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DataImporter {
+	private static final String delimiter_ = new String(",");
+
+	private static Logger logger_ = Logger.getLogger(DataImporter.class);
+
+	private static final String tablename_ = new String("Mailbox");
+
+	public static EndPoint from_ = new EndPoint("172.21.211.181", 10001);
+
+	public static EndPoint to_ = new EndPoint("hadoop071.sf2p.facebook.com",
+			7000);
+
+	public static EndPoint[] tos_ = new EndPoint[]{ new EndPoint("hadoop038.sf2p.facebook.com",	7000),
+													new EndPoint("hadoop039.sf2p.facebook.com", 7000),
+													new EndPoint("hadoop040.sf2p.facebook.com", 7000),
+													new EndPoint("hadoop041.sf2p.facebook.com",	7000)
+												};
+	private static final String columnFamily_ = new String("MailboxUserList");
+
+	private Cassandra.Client peerstorageClient_ = null;
+
+	public void test(String line) throws IOException {
+		StringTokenizer st = new StringTokenizer(line, delimiter_);
+		StringBuilder sb = new StringBuilder("");
+		int i = 0;
+		String column = null;
+		int ts = 0;
+		String columnValue = null;
+
+		while (st.hasMoreElements()) {
+			switch (i) {
+			case 0:
+				sb.append((String) st.nextElement());
+				sb.append(":");
+				break;
+
+			case 1:
+				sb.append((String) st.nextElement());
+				break;
+
+			case 2:
+				column = (String) st.nextElement();
+				break;
+
+			case 3:
+				ts = Integer.parseInt((String) st.nextElement());
+				break;
+
+			case 4:
+				columnValue = (String) st.nextElement();
+				break;
+
+			default:
+				break;
+			}
+			++i;
+		}
+
+		String rowKey = sb.toString();
+		try {
+			long t = System.currentTimeMillis();
+			peerstorageClient_.insert(tablename_, rowKey, columnFamily_ + ":"
+					+ column, columnValue, ts);
+			logger_.debug("Time taken for thrift..."
+					+ (System.currentTimeMillis() - t));
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		/* Added the thrift call to storage. */
+	}
+
+	private int roundRobin_ = 0 ;
+    private Random random_ = new Random();
+    
+	public void apply(batch_mutation_t batchMutation) {
+
+		columnFamilyHack_++;
+		try {
+			Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+			long t = System.currentTimeMillis();
+			peerstorageClient_.batch_insert(batchMutation);
+			logger_.debug("Time taken for thrift..."
+					+ (System.currentTimeMillis() - t));
+		} catch (Exception e) {
+			try {
+				peerstorageClient_ = connect();
+				peerstorageClient_.batch_insert(batchMutation);
+			} catch (Exception e1) {
+				e1.printStackTrace();
+			}
+		}
+	}
+
+	public void apply(batch_mutation_super_t batchMutation) {
+
+		columnFamilyHack_++;
+		try {
+			Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+			long t = System.currentTimeMillis();
+			peerstorageClient_.batch_insert_superColumn(batchMutation);
+			logger_.debug("Time taken for thrift..."
+					+ (System.currentTimeMillis() - t));
+		} catch (Exception e) {
+			try {
+				peerstorageClient_ = connect();
+				peerstorageClient_.batch_insert_superColumn(batchMutation);
+			} catch (Exception e1) {
+				e1.printStackTrace();
+			}
+		}
+	}
+	TTransport transport_ = null;
+	public Cassandra.Client connect() {
+//		String host = "hadoop034.sf2p.facebook.com";
+		String[] hosts = new String[] { "hadoop038.sf2p.facebook.com",
+				"hadoop039.sf2p.facebook.com",
+				"hadoop040.sf2p.facebook.com",
+				"hadoop041.sf2p.facebook.com"
+			  };
+		int port = 9160;
+            
+		//TNonBlockingSocket socket = new TNonBlockingSocket(hosts[roundRobin_], port);
+		TSocket socket = new TSocket("hadoop071.sf2p.facebook.com", port); 
+		roundRobin_ = (roundRobin_+1)%4;
+		if(transport_ != null)
+			transport_.close();
+		transport_ = socket;
+
+		TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport_, false,
+				false);
+		Cassandra.Client peerstorageClient = new Cassandra.Client(
+				binaryProtocol);
+		try
+		{
+			transport_.open();
+		}
+		catch(Exception e)
+		{
+			e.printStackTrace();
+		}
+		return peerstorageClient;
+	}
+    
+    private static boolean isNumeric(String str)
+    {
+        try
+        {
+            Integer.parseInt(str);
+            return true;
+        }
+        catch (NumberFormatException nfe)
+        {
+            return false;
+        }
+    }
+
+    int columnFamilyHack_ = 0 ;
+    public int  divideby_ = 4;
+
+	public void testBatchRunner(String filepath) throws IOException {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		String firstuser = null;
+		String nextuser = null;
+		batch_mutation_t rmInbox = null;
+		batch_mutation_t rmOutbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			int uid =0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+
+			nextuser = user;
+			if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+				firstuser = nextuser;
+				if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+/*					fos_.write(rmInbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+*/					apply(rmInbox);
+				}
+				if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+/*					fos_.write(rmOutbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+*/					apply(rmOutbox);
+				}
+				rmInbox = new batch_mutation_t();
+				rmInbox.table = "Mailbox";
+				rmInbox.key = firstuser + ":0";
+				rmInbox.cfmap = new HashMap<String, List<column_t>>();
+
+				rmOutbox = new batch_mutation_t();
+				rmOutbox.table = "Mailbox";
+				rmOutbox.key = firstuser + ":1";
+				rmOutbox.cfmap = new HashMap<String, List<column_t>>();
+			}
+			column_t columnData = new column_t();
+			columnData.columnName = threadId;
+			columnData.value = String.valueOf(isDeleted);
+			columnData.timestamp = lastUpdated;
+			// List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+			if (folder == 0) {
+				List<column_t> list = rmInbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<column_t>();
+					rmInbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+				}
+                //if(list.size() < 500)
+                    list.add(columnData);
+			} else {
+				List<column_t> list = rmOutbox.cfmap
+						.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<column_t>();
+					rmOutbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+				}
+                //if(list.size() < 500)
+                    list.add(columnData);
+			}
+		}
+		if (firstuser != null) {
+			if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+/*				fos_.write(rmInbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+*/				apply(rmInbox);
+			}
+			if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+/*				fos_.write(rmOutbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+*/				apply(rmOutbox);
+			}
+		}
+		/* Added the thrift call to storage. */
+	}
+
+
+	public void testMailboxBatchRunner(String filepath) throws IOException {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		String firstuser = null;
+		String nextuser = null;
+		batch_mutation_t rmInbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			int uid =0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+
+			nextuser = user;
+			if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+				firstuser = nextuser;
+				if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+					apply(rmInbox);
+				}
+				rmInbox = new batch_mutation_t();
+				rmInbox.table = "Mailbox";
+				rmInbox.key = firstuser;
+				rmInbox.cfmap = new HashMap<String, List<column_t>>();
+			}
+			column_t columnData = new column_t();
+			columnData.columnName = threadId;
+			columnData.value = String.valueOf(isDeleted);
+			columnData.timestamp = lastUpdated;
+			List<column_t> list = rmInbox.cfmap.get("MailboxMailList"+(columnFamilyHack_%divideby_));
+			if (list == null) {
+				list = new ArrayList<column_t>();
+				rmInbox.cfmap.put("MailboxMailList"+(columnFamilyHack_%divideby_), list);
+			}
+            list.add(columnData);
+		}
+		if (firstuser != null) {
+			if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+				apply(rmInbox);
+			}
+		}
+	}
+
+	public void testSuperBatchRunner(String filepath) throws IOException {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		String firstuser = null;
+		String nextuser = null;
+		batch_mutation_super_t rmInbox = null;
+		batch_mutation_super_t rmOutbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			int uid =0;
+			String user = null;
+			String subject = null;
+			String body = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				case 5:
+					st.nextElement();
+					break;
+
+				case 6:
+					st.nextElement();
+					break;
+
+				case 7:
+					subject = (String) st.nextElement();
+					break;
+
+				case 8:
+					body = (String) st.nextElement();
+					break;
+				default:
+					st.nextElement();
+					break;
+				}
+				++i;
+			}
+
+			nextuser = user;
+			if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+				firstuser = nextuser;
+				if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+					fos_.write(rmInbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+					apply(rmInbox);
+				}
+				if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+					fos_.write(rmOutbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+					apply(rmOutbox);
+				}
+				rmInbox = new batch_mutation_super_t();
+				rmInbox.table = "Mailbox";
+				rmInbox.key = firstuser ;//+ ":0";
+				rmInbox.cfmap = new HashMap<String, List<superColumn_t>>();
+
+				rmOutbox = new batch_mutation_super_t();
+				rmOutbox.table = "Mailbox";
+				rmOutbox.key = firstuser ;//+ ":1";
+				rmOutbox.cfmap = new HashMap<String, List<superColumn_t>>();
+			}
+			column_t columnData = new column_t();
+			columnData.columnName = threadId;
+			columnData.value = String.valueOf(isDeleted);
+			columnData.timestamp = lastUpdated;
+			// List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+			if (folder == 0) {
+				List<superColumn_t> list = rmInbox.cfmap.get("MailboxThreadList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<superColumn_t>();
+					rmInbox.cfmap.put("MailboxThreadList"+(columnFamilyHack_%divideby_), list);
+				}
+				if( subject == null)
+					subject = "";
+				if( body == null ) 
+					body = "";
+				List<String> tokenList  = tokenize(subject + " " + body);
+				for(String token : tokenList)
+				{
+					superColumn_t superColumn = new superColumn_t();
+					superColumn.name = token;
+					superColumn.columns = new ArrayList<column_t>();
+					superColumn.columns.add(columnData);
+					list.add(superColumn);
+				}
+			} else {
+				List<superColumn_t> list = rmOutbox.cfmap.get("MailboxThreadList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<superColumn_t>();
+					rmOutbox.cfmap.put("MailboxThreadList"+(columnFamilyHack_%divideby_), list);
+				}
+				if( subject == null)
+					subject = "";
+				if( body == null ) 
+					body = "";
+				List<String> tokenList  = tokenize(subject + " " + body);
+				for(String token : tokenList)
+				{
+					superColumn_t superColumn = new superColumn_t();
+					superColumn.name = token;
+					superColumn.columns = new ArrayList<column_t>();
+					superColumn.columns.add(columnData);
+					list.add(superColumn);
+				}
+			}
+		}
+		if (firstuser != null) {
+			if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+				fos_.write(rmInbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+				apply(rmInbox);
+			}
+			if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+				fos_.write(rmOutbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+				apply(rmOutbox);
+			}
+		}
+		/* Added the thrift call to storage. */
+	}
+	
+    public static String[] getTokens(String str, String delim)
+    {
+        StringTokenizer st = new StringTokenizer(str, delim);
+        String[] values = new String[st.countTokens()];
+        int i = 0;
+        while ( st.hasMoreElements() )
+	    {
+		values[i++] = (String)st.nextElement();
+	    }
+        return values;
+    }
+
+    public static boolean checkUser(String user, String[] list)
+    {
+    	boolean bFound = false;
+    	for(String l:list)
+    	{	
+    		if(user.equals(l))
+    		{
+    			bFound = true;
+    		}
+    	}
+    	return bFound;
+    }
+    
+    public void testSuperUserBatchRunner(String filepath) throws IOException {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		String firstuser = null;
+		String nextuser = null;
+		batch_mutation_super_t rmInbox = null;
+		batch_mutation_super_t rmOutbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			int uid =0;
+			String user = null;
+			String subject = null;
+			String body = null;
+			String authors = null;
+			String participants = null;
+			
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				case 5:
+					authors = (String) st.nextElement();
+					break;
+
+				case 6:
+					participants = (String)st.nextElement();
+					break;
+
+				case 7:
+					subject = (String) st.nextElement();
+					break;
+
+				case 8:
+					body = (String) st.nextElement();
+					break;
+				default:
+					st.nextElement();
+					break;
+				}
+				++i;
+			}
+
+			nextuser = user;
+			if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+				firstuser = nextuser;
+				if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+					fos_.write(rmInbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+					apply(rmInbox);
+				}
+				if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+					fos_.write(rmOutbox.key.getBytes());
+					fos_.write( System.getProperty("line.separator").getBytes());
+	                counter_.incrementAndGet();
+					apply(rmOutbox);
+				}
+				rmInbox = new batch_mutation_super_t();
+				rmInbox.table = "Mailbox";
+				rmInbox.key = firstuser ;//+ ":0";
+				rmInbox.cfmap = new HashMap<String, List<superColumn_t>>();
+
+				rmOutbox = new batch_mutation_super_t();
+				rmOutbox.table = "Mailbox";
+				rmOutbox.key = firstuser ;//+ ":1";
+				rmOutbox.cfmap = new HashMap<String, List<superColumn_t>>();
+			}
+			column_t columnData = new column_t();
+			columnData.columnName = threadId;
+			columnData.value = String.valueOf(isDeleted);
+			columnData.timestamp = lastUpdated;
+			// List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+			if (folder == 0) {
+				List<superColumn_t> list = rmInbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<superColumn_t>();
+					rmInbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+				}
+				if( authors == null)
+					authors = "";
+				if( participants == null ) 
+					participants = "";
+				String[] authorList = getTokens(authors,":");
+				String[] partList = getTokens(participants,":");
+				String[] tokenList = null;
+				if(checkUser(user,authorList))
+				{
+					tokenList = partList;
+				}
+				else
+				{
+					tokenList = authorList;
+				}
+				
+				for(String token : tokenList)
+				{
+					superColumn_t superColumn = new superColumn_t();
+					superColumn.name = token;
+					superColumn.columns = new ArrayList<column_t>();
+					superColumn.columns.add(columnData);
+					list.add(superColumn);
+				}
+			} else {
+				List<superColumn_t> list = rmOutbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+				if (list == null) {
+					list = new ArrayList<superColumn_t>();
+					rmOutbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+				}
+				if( authors == null)
+					authors = "";
+				if( participants == null ) 
+					participants = "";
+				String[] authorList = getTokens(authors,":");
+				String[] partList = getTokens(participants,":");
+				String[] tokenList = null;
+				if(checkUser(user,authorList))
+				{
+					tokenList = partList;
+				}
+				else
+				{
+					tokenList = authorList;
+				}
+				for(String token : tokenList)
+				{
+					superColumn_t superColumn = new superColumn_t();
+					superColumn.name = token;
+					superColumn.columns = new ArrayList<column_t>();
+					superColumn.columns.add(columnData);
+					list.add(superColumn);
+				}
+			}
+		}
+		if (firstuser != null) {
+			if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+				fos_.write(rmInbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+				apply(rmInbox);
+			}
+			if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+				fos_.write(rmOutbox.key.getBytes());
+				fos_.write( System.getProperty("line.separator").getBytes());
+                counter_.incrementAndGet();
+				apply(rmOutbox);
+			}
+		}
+		/* Added the thrift call to storage. */
+	}
+	
+	
+	
+	// Defining these privates here as they make more snese with the functions
+	// below
+	// Sorry
+
+	private int numCreated_ = 0;
+
+	ThreadFactory tf_ = null;
+
+	ScheduledExecutorService pool_ = null;
+
+	private int requestsPerSecond_ = 50;
+    public FileOutputStream fos_ = null;
+    private AtomicInteger counter_ = new AtomicInteger(0);
+    
+	// This is the task that gets scheduled
+	// This could be different for different kind of tasks
+	class Task implements Runnable {       
+		RowMutationMessage rmMsg_ = null;
+		
+		public Task(RowMutationMessage rmMsg) {
+			rmMsg_ = rmMsg;
+		}
+
+		public void run() {
+			try {
+				long t = System.currentTimeMillis();
+                counter_.incrementAndGet();
+				Message message = new Message(DataImporter.from_,
+						StorageService.mutationStage_,
+						StorageService.mutationVerbHandler_,
+						new Object[] { rmMsg_ });
+				MessagingService.getMessagingInstance().sendOneWay(message,
+						DataImporter.to_);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	public DataImporter() throws Throwable {
+		tf_ = new ThreadFactoryImpl("LOAD-GENERATOR");
+		pool_ = new DebuggableScheduledThreadPoolExecutor(100, tf_);
+		fos_ = new FileOutputStream("keys.dat", true);
+	}
+    
+	public long errorCount_ = 0;
+
+	public long queryCount_ = 0;
+
+	public void testRead(String filepath) throws Throwable {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		RowMutationMessage rmInbox = null;
+		RowMutationMessage rmOutbox = null;
+		ColumnFamily cfInbox = null;
+		ColumnFamily cfOutbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+			String key = null;
+			if (folder == 0) {
+				key = user + ":0";
+			} else {
+				key = user + ":1";
+			}
+
+			ReadMessage readMessage = new ReadMessage(tablename_, key);
+			Message message = new Message(from_, StorageService.readStage_,
+					StorageService.readVerbHandler_,
+					new Object[] { readMessage });
+			IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(
+					message, to_);
+			Object[] result = iar.get();
+			ReadResponseMessage readResponseMessage = (ReadResponseMessage) result[0];
+			Row row = readResponseMessage.row();
+			if (row == null) {
+				logger_.debug("ERROR No row for this key .....: " + line);
+                Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+				errorCount_++;
+			} else {
+				Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+				if (cfMap == null || cfMap.size() == 0) {
+					logger_
+							.debug("ERROR ColumnFamil map is missing.....: "
+									+ threadId + "   key:" + key
+									+ "    record:" + line);
+					System.out
+							.println("ERROR ColumnFamil map is missing.....: "
+									+ threadId + "   key:" + key
+									+ "    record:" + line);
+					errorCount_++;
+					continue;
+				}
+				ColumnFamily cfamily = cfMap.get(columnFamily_);
+				if (cfamily == null) {
+					logger_
+							.debug("ERROR ColumnFamily  is missing.....: "
+									+ threadId + "   key:" + key
+									+ "    record:" + line);
+					System.out
+							.println("ERROR ColumnFamily  is missing.....: "
+									+ threadId + "   key:" + key
+									+ "    record:" + line);
+					errorCount_++;
+					continue;
+				}
+
+				IColumn clmn = cfamily.getColumn(threadId);
+				queryCount_++;
+				if (clmn == null) {
+					logger_.debug("ERROR Column is missing.....: " + threadId
+							+ "    record:" + line);
+					System.out.println("ERROR Column is missing.....: "
+							+ threadId + "    record:" + line);
+                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+					errorCount_++;
+				} else {
+					// logger_.debug("SUCCESS .....for column : "+clmn.name()+"
+					// Record:" + line);
+                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+				}
+			}
+			
+		}
+
+	}
+
+	private List<String> tokenize(String string)
+	{
+		List<String> stringList = new ArrayList<String>();
+	    Analyzer analyzer = new StandardAnalyzer();
+	    TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(string));
+	    Token token = null;
+	    try
+	    {
+		    token = ts.next();
+		    while(token != null)
+		    {
+		    	stringList.add(token.termText());
+			    token = ts.next();
+		    }
+	    }
+	    catch(IOException ex)
+	    {
+	    	ex.printStackTrace();
+	    }
+		
+		return stringList;
+	}
+	private int numReqs_ = 0;
+	private long totalTime_ = 0 ;
+	
+	public void testReadThrift(String filepath) throws Throwable {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		RowMutationMessage rmInbox = null;
+		RowMutationMessage rmOutbox = null;
+		ColumnFamily cfInbox = null;
+		ColumnFamily cfOutbox = null;
+		String firstuser = null ;
+		String nextuser = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+			String key = null;
+			if (folder == 0) {
+				key = user + ":0";
+			} else {
+				key = user + ":1";
+			}
+
+			nextuser = key;
+			if(firstuser == null || firstuser.compareTo(nextuser) != 0)
+			{
+				List<column_t> columns = null;
+				firstuser = key;
+				try {
+                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+					long t = System.currentTimeMillis();
+					
+					columns = peerstorageClient_.get_slice(tablename_,key,columnFamily_+(columnFamilyHack_%divideby_),0,10);
+					numReqs_++;
+					totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+					logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+  "   Time taken for thrift..."
+							+ (System.currentTimeMillis() - t));
+				} catch (Exception e) {
+						e.printStackTrace();
+					}
+				if (columns == null) {
+					logger_.debug("ERROR No row for this key .....: " + line);
+                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+					errorCount_++;
+				} else {
+					if (columns.size() == 0) {
+						logger_
+								.debug("ERROR ColumnFamil map is missing.....: "
+										+ threadId + "   key:" + key
+										+ "    record:" + line);
+						System.out
+								.println("ERROR ColumnFamil map is missing.....: "
+										+ threadId + "   key:" + key
+										+ "    record:" + line);
+						errorCount_++;
+						continue;
+					}
+					else {
+						//logger_.debug("SUCCESS .....for key : "+key);
+						//System.out.println("SUCCESS .....for key : "+key);
+						//for(int j = 0 ; j< columns.size() ; j++ ){
+							//System.out.print("  " + columns.get(j)+",");
+						//}
+						// Record:" + line);
+						//Thread.sleep(5);
+					}
+				}
+				queryCount_++;
+			}
+		}
+
+	}
+	
+
+	public void testSuperReadThrift(String filepath) throws Throwable {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		RowMutationMessage rmInbox = null;
+		RowMutationMessage rmOutbox = null;
+		ColumnFamily cfInbox = null;
+		ColumnFamily cfOutbox = null;
+		String firstuser = null ;
+		String nextuser = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			String user = null;
+			String subject = null;
+			String body = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+				
+				case 5:
+					st.nextElement();
+					break;
+
+
+				case 6:
+					st.nextElement();
+					break;
+
+				case 7:
+					subject = (String) st.nextElement();
+					break;
+
+				case 8:
+					body = (String) st.nextElement();
+					break;
+				default:
+					st.nextElement();
+					break;
+				}
+				++i;
+			}
+			String key = null;
+			if (folder == 0) {
+				key = user ;//+ ":0";
+			} else {
+				key = user ;//+ ":1";
+			}
+
+			List<column_t> columns = null;
+			firstuser = key;
+			try {
+                Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+				if( subject == null )
+					subject = "";
+				if( body == null )
+					body = "";
+				List<String> tokenList = tokenize(subject + " " + body ) ;
+				
+				for( String token: tokenList )
+				{
+					long t = System.currentTimeMillis();
+					columns = peerstorageClient_.get_slice(tablename_,key,"MailboxThreadList"+(columnFamilyHack_%divideby_)+":"+token,0,10);
+					totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+					numReqs_++;
+					logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+  "   Time taken for thrift..."
+							+ (System.currentTimeMillis() - t));
+					if (columns == null) {
+						logger_.debug(" TOKEN: " + token + "  ERROR No row for this key .....: " + line);
+	                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+						errorCount_++;
+					} else {
+						if (columns.size() == 0) {
+							logger_
+									.debug("ERROR ColumnFamil map is missing.....: "
+											+ threadId + "   key:" + key
+											+ " TOKEN: " + token
+											+ "    record:" + line);
+							System.out
+									.println("ERROR ColumnFamil map is missing.....: "
+											+ threadId + "   key:" + key
+											+ " TOKEN: " + token
+											+ "    record:" + line);
+							errorCount_++;
+							continue;
+						}
+						else {
+							boolean found = false;
+							for(column_t column : columns)
+							{
+								if(column.columnName.equalsIgnoreCase(threadId))
+								{
+									found = true ;
+									break;
+								}
+							}
+							if(!found)
+							{
+								logger_
+										.debug("ERROR column is missing.....: "
+												+ threadId + "   key:" + key
+												+ " TOKEN: " + token
+												+ "    record:" + line);
+								System.out
+										.println("ERROR column is missing.....: "
+												+ threadId + "   key:" + key
+												+ " TOKEN: " + token
+												+ "    record:" + line);
+								errorCount_++;
+										
+							}
+							//logger_.debug("SUCCESS .....for key : "+key);
+							//System.out.println("SUCCESS .....for key : "+key);
+							//for(int j = 0 ; j< columns.size() ; j++ ){
+								//System.out.print("  " + columns.get(j)+",");
+							//}
+							// Record:" + line);
+							//Thread.sleep(5);
+						}
+					}
+				}
+				queryCount_++;
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+				
+		}
+
+	}
+	
+	
+	
+	public void testLoadGeneratorBatchRunner(String filepath) throws Throwable
+    {
+        BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+                new FileInputStream(filepath)), 16 * 1024 * 1024);
+        String line = null;
+        String delimiter_ = new String(",");
+        String firstuser = null;
+        String nextuser = null;
+        RowMutation rmInbox = null;
+        RowMutation rmOutbox = null;
+        ColumnFamily cfInbox = null;
+        ColumnFamily cfOutbox = null;
+        while ((line = bufReader.readLine()) != null)
+        {
+            StringTokenizer st = new StringTokenizer(line, delimiter_);
+            int i = 0;
+            String threadId = null;
+            int lastUpdated = 0;
+            int isDeleted = 0;
+            int folder = 0;
+            String user = null;
+            while (st.hasMoreElements())
+            {
+                switch (i)
+                {
+                case 0:
+                    user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    break;
+
+                case 1:
+                    folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+                    break;
+
+                case 2:
+                    threadId = (String) st.nextElement();
+                    break;
+
+                case 3:
+                    lastUpdated = Integer.parseInt((String) st.nextElement());
+                    break;
+
+                case 4:
+                    isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+                    break;
+
+                default:
+                    break;
+                }
+                ++i;
+            }
+
+         nextuser = user;
+          if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+                  firstuser = nextuser;
+                  if (rmInbox != null) {
+                          applyLoad(rmInbox);
+                  }
+                  if (rmOutbox != null) {
+                          applyLoad(rmOutbox);
+                  }
+                  rmInbox = new RowMutation(tablename_, firstuser + ":0");
+                  rmOutbox = new RowMutation(tablename_, firstuser + ":1");
+          }
+          // List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+          if (folder == 0) {
+              rmInbox.add(columnFamily_+(columnFamilyHack_%divideby_)+":"+threadId, String.valueOf(isDeleted).getBytes(), lastUpdated);
+          } else {
+              rmOutbox.add(columnFamily_+(columnFamilyHack_%divideby_)+":"+threadId,String.valueOf(isDeleted).getBytes(),lastUpdated);
+          }
+  }
+  if (firstuser != null) {
+          if (rmInbox != null) {
+                  applyLoad(rmInbox);
+          }
+          if (rmOutbox != null) {
+                  applyLoad(rmOutbox);
+          }
+  }
+    
+    
+    
+    
+    }
+    
+    /*
+     * This function will apply the given task . It is based on a requests per
+     * second member variable which can be set to teh required ammount , it will
+     * generate only those many requests and if thos emany requests have already
+     * been entered then it will sleep . This function assumes that there is no
+     * waiting in any other part of the code so the requests are being generated
+     * instantaniously .
+     */
+    public void applyLoad(RowMutation rm) throws IOException {
+        try
+        {
+            long t = System.currentTimeMillis();
+            counter_.incrementAndGet();
+    		columnFamilyHack_++;
+            EndPoint to = new EndPoint(7000);
+            RowMutationMessage rmMsg = new RowMutationMessage(rm);           
+            Message message = new Message(to, 
+                    StorageService.mutationStage_,
+                    StorageService.mutationVerbHandler_, 
+                    new Object[]{ rmMsg }
+            );                                                            
+			MessagingService.getMessagingInstance().sendRR(message, to);
+            Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+            
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+        
+    }
+
+    
+    
+	public void testRemove(String filepath) throws Throwable {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		RowMutationMessage rmInbox = null;
+		RowMutationMessage rmOutbox = null;
+		ColumnFamily cfInbox = null;
+		ColumnFamily cfOutbox = null;
+		String firstuser = null ;
+		String nextuser = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+			String key = null;
+			if (folder == 0) {
+				key = user + ":0";
+			} else {
+				key = user + ":1";
+			}
+
+			nextuser = key;
+			if(firstuser == null || firstuser.compareTo(nextuser) != 0)
+			{
+				ArrayList<column_t> columns = null;
+				firstuser = key;
+				try {
+                    Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+					long t = System.currentTimeMillis();
+					
+					peerstorageClient_.remove(tablename_,key,(columnFamilyHack_%divideby_)+":"+threadId);
+					numReqs_++;
+					totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+					logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+  "   Time taken for thrift..."
+							+ (System.currentTimeMillis() - t));
+				} catch (Exception e) {
+						e.printStackTrace();
+					}
+			}
+		}
+
+	}
+
+	public void run(String[] args) throws Throwable
+    {
+		if (args[0].compareTo("-testWriteMailbox") == 0  ||
+			args[0].compareTo("-testSuperUserWrite") == 0  ||
+			args[0].compareTo("-testPhp") == 0  ||
+			args[0].compareTo("-testWrite") == 0  ||
+			args[0].compareTo("-testRead") == 0 ||
+			args[0].compareTo("-testWriteSuper") == 0  ||
+			args[0].compareTo("-testReadSuper") == 0 ||
+			args[0].compareTo("-testRemove") == 0 
+			) 
+		{
+			int totalNumofServers = 5; // total number of servers we want to
+			if (args.length > 4) {
+				totalNumofServers = Integer.parseInt((String) args[4]);
+			}			// run on
+			if (args.length > 5) {
+				divideby_ = Integer.parseInt((String) args[5]);
+			}			// run on
+			int index = Integer.parseInt(args[1]);
+			int fileCount = 0; // need to get the file count
+			int start = 0;
+			int end = 0;
+			File file = new File(args[2]);
+			fileCount = file.list().length;
+			if (index == -1) {
+				start = 0;
+				end = fileCount;
+			} else {
+				int skip = fileCount / totalNumofServers;
+				if (fileCount != 0 && skip == 0) {
+					skip = 1;
+				}
+				start = skip * index;
+				if (index == totalNumofServers - 1) {
+					end = fileCount;
+				} else {
+					end = start + skip;
+				}
+			}
+			if (args.length > 3) {
+				requestsPerSecond_ = Integer.parseInt((String) args[3]);
+			}
+			long t = System.currentTimeMillis();
+			peerstorageClient_ = connect();
+			for (int i = start; i < end; i++) {
+				String fileName = file.list()[i];
+				if ( args[0].compareTo("-testRead") == 0 )
+				{
+					testReadThrift(args[2] + System.getProperty("file.separator")
+							+ fileName);
+				}
+				if ( args[0].compareTo("-testWrite") == 0 )
+				{
+					testBatchRunner(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testWriteSuper") == 0 )
+				{
+					testSuperBatchRunner(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testReadSuper") == 0 )
+				{
+					testSuperReadThrift(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testRemove") == 0 )
+				{
+					testRemove(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testPhp") == 0 )
+				{
+					testPhp(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testSuperUserWrite") == 0 )
+				{
+					testSuperUserBatchRunner(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				if ( args[0].compareTo("-testWriteMailbox") == 0 )
+				{
+					testMailboxBatchRunner(args[2]
+							+ System.getProperty("file.separator") + fileName);
+				}
+				System.out.println(args[2]
+						+ System.getProperty("file.separator") + fileName);
+			}
+			if(transport_ != null)
+				transport_.close();
+			System.out.println("start :" + start + "    end : " + end);
+			System.out.println("Time taken  .."
+					+ (System.currentTimeMillis() - t));
+            System.out.println("Keys sent over: " + counter_.get());
+			fos_.close();
+			return;
+		}
+		else
+		{
+			System.out.println("Invalid option");
+		}
+    }
+
+
+	
+	public void testPhp(String filepath) throws IOException {
+		BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+				new FileInputStream(filepath)), 16 * 1024 * 1024);
+		String line = null;
+		String delimiter_ = new String(",");
+		String firstuser = null;
+		String nextuser = null;
+		batch_mutation_t rmInbox = null;
+		batch_mutation_t rmOutbox = null;
+		while ((line = bufReader.readLine()) != null) {
+			StringTokenizer st = new StringTokenizer(line, delimiter_);
+			int i = 0;
+			String threadId = null;
+			int lastUpdated = 0;
+			int isDeleted = 0;
+			int folder = 0;
+			int uid =0;
+			String user = null;
+			while (st.hasMoreElements()) {
+				switch (i) {
+				case 0:
+					user = (String) st.nextElement();// sb.append((String)st.nextElement());
+                    if ( !isNumeric(user))
+                        continue;
+					
+					break;
+
+				case 1:
+					folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+					break;
+
+				case 2:
+					threadId = (String) st.nextElement();
+					break;
+
+				case 3:
+					lastUpdated = Integer.parseInt((String) st.nextElement());
+					break;
+
+				case 4:
+					isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+					break;
+
+				default:
+					break;
+				}
+				++i;
+			}
+			String cmd = "php /home/pmalik/www/scripts/mbox_index/search_test.php  "
+				+ (new File(filepath)).getName() + "  " + user +"  " +threadId + "  "+ line;
+			Process process = Runtime.getRuntime().exec(cmd);
+		
+		}
+	}
+    class PhpExecute implements Runnable
+    {
+        private String cmdLine_;
+        
+        PhpExecute(String cmdLine)
+        {
+        	cmdLine_ = cmdLine;
+        }
+        
+        public void run()
+        {
+            try
+            {
+    			System.out.println(cmdLine_);
+    			Process process = Runtime.getRuntime().exec(cmdLine_);
+    			try
+    			{
+    				//process.waitFor();
+    			}
+    			catch ( Exception e)
+    			{
+    				e.printStackTrace();
+    			}
+            }
+            catch (Exception ex)
+            {
+            	ex.printStackTrace();
+            }
+        }        
+    }
+}