You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [10/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,287 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class batch_mutation_super_t implements TBase, java.io.Serializable {
+ public String table;
+ public String key;
+ public Map<String,List<superColumn_t>> cfmap;
+ public Map<String,List<superColumn_t>> cfmapdel;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean table = false;
+ public boolean key = false;
+ public boolean cfmap = false;
+ public boolean cfmapdel = false;
+ }
+
+ public batch_mutation_super_t() {
+ }
+
+ public batch_mutation_super_t(
+ String table,
+ String key,
+ Map<String,List<superColumn_t>> cfmap,
+ Map<String,List<superColumn_t>> cfmapdel)
+ {
+ this();
+ this.table = table;
+ this.__isset.table = true;
+ this.key = key;
+ this.__isset.key = true;
+ this.cfmap = cfmap;
+ this.__isset.cfmap = true;
+ this.cfmapdel = cfmapdel;
+ this.__isset.cfmapdel = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof batch_mutation_super_t)
+ return this.equals((batch_mutation_super_t)that);
+ return false;
+ }
+
+ public boolean equals(batch_mutation_super_t that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_table = true && (this.table != null);
+ boolean that_present_table = true && (that.table != null);
+ if (this_present_table || that_present_table) {
+ if (!(this_present_table && that_present_table))
+ return false;
+ if (!this.table.equals(that.table))
+ return false;
+ }
+
+ boolean this_present_key = true && (this.key != null);
+ boolean that_present_key = true && (that.key != null);
+ if (this_present_key || that_present_key) {
+ if (!(this_present_key && that_present_key))
+ return false;
+ if (!this.key.equals(that.key))
+ return false;
+ }
+
+ boolean this_present_cfmap = true && (this.cfmap != null);
+ boolean that_present_cfmap = true && (that.cfmap != null);
+ if (this_present_cfmap || that_present_cfmap) {
+ if (!(this_present_cfmap && that_present_cfmap))
+ return false;
+ if (!this.cfmap.equals(that.cfmap))
+ return false;
+ }
+
+ boolean this_present_cfmapdel = true && (this.cfmapdel != null);
+ boolean that_present_cfmapdel = true && (that.cfmapdel != null);
+ if (this_present_cfmapdel || that_present_cfmapdel) {
+ if (!(this_present_cfmapdel && that_present_cfmapdel))
+ return false;
+ if (!this.cfmapdel.equals(that.cfmapdel))
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.STRING) {
+ this.table = iprot.readString();
+ this.__isset.table = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2:
+ if (field.type == TType.STRING) {
+ this.key = iprot.readString();
+ this.__isset.key = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3:
+ if (field.type == TType.MAP) {
+ {
+ TMap _map22 = iprot.readMapBegin();
+ this.cfmap = new HashMap<String,List<superColumn_t>>(2*_map22.size);
+ for (int _i23 = 0; _i23 < _map22.size; ++_i23)
+ {
+ String _key24;
+ List<superColumn_t> _val25;
+ _key24 = iprot.readString();
+ {
+ TList _list26 = iprot.readListBegin();
+ _val25 = new ArrayList<superColumn_t>(_list26.size);
+ for (int _i27 = 0; _i27 < _list26.size; ++_i27)
+ {
+ superColumn_t _elem28 = new superColumn_t();
+ _elem28 = new superColumn_t();
+ _elem28.read(iprot);
+ _val25.add(_elem28);
+ }
+ iprot.readListEnd();
+ }
+ this.cfmap.put(_key24, _val25);
+ }
+ iprot.readMapEnd();
+ }
+ this.__isset.cfmap = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 4:
+ if (field.type == TType.MAP) {
+ {
+ TMap _map29 = iprot.readMapBegin();
+ this.cfmapdel = new HashMap<String,List<superColumn_t>>(2*_map29.size);
+ for (int _i30 = 0; _i30 < _map29.size; ++_i30)
+ {
+ String _key31;
+ List<superColumn_t> _val32;
+ _key31 = iprot.readString();
+ {
+ TList _list33 = iprot.readListBegin();
+ _val32 = new ArrayList<superColumn_t>(_list33.size);
+ for (int _i34 = 0; _i34 < _list33.size; ++_i34)
+ {
+ superColumn_t _elem35 = new superColumn_t();
+ _elem35 = new superColumn_t();
+ _elem35.read(iprot);
+ _val32.add(_elem35);
+ }
+ iprot.readListEnd();
+ }
+ this.cfmapdel.put(_key31, _val32);
+ }
+ iprot.readMapEnd();
+ }
+ this.__isset.cfmapdel = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("batch_mutation_super_t");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ if (this.table != null) {
+ field.name = "table";
+ field.type = TType.STRING;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.table);
+ oprot.writeFieldEnd();
+ }
+ if (this.key != null) {
+ field.name = "key";
+ field.type = TType.STRING;
+ field.id = 2;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.key);
+ oprot.writeFieldEnd();
+ }
+ if (this.cfmap != null) {
+ field.name = "cfmap";
+ field.type = TType.MAP;
+ field.id = 3;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmap.size()));
+ for (String _iter36 : this.cfmap.keySet()) {
+ oprot.writeString(_iter36);
+ {
+ oprot.writeListBegin(new TList(TType.STRUCT, this.cfmap.get(_iter36).size()));
+ for (superColumn_t _iter37 : this.cfmap.get(_iter36)) {
+ _iter37.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (this.cfmapdel != null) {
+ field.name = "cfmapdel";
+ field.type = TType.MAP;
+ field.id = 4;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmapdel.size()));
+ for (String _iter38 : this.cfmapdel.keySet()) {
+ oprot.writeString(_iter38);
+ {
+ oprot.writeListBegin(new TList(TType.STRUCT, this.cfmapdel.get(_iter38).size()));
+ for (superColumn_t _iter39 : this.cfmapdel.get(_iter38)) {
+ _iter39.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("batch_mutation_super_t(");
+ sb.append("table:");
+ sb.append(this.table);
+ sb.append(",key:");
+ sb.append(this.key);
+ sb.append(",cfmap:");
+ sb.append(this.cfmap);
+ sb.append(",cfmapdel:");
+ sb.append(this.cfmapdel);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,287 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class batch_mutation_t implements TBase, java.io.Serializable {
+ public String table;
+ public String key;
+ public Map<String,List<column_t>> cfmap;
+ public Map<String,List<column_t>> cfmapdel;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean table = false;
+ public boolean key = false;
+ public boolean cfmap = false;
+ public boolean cfmapdel = false;
+ }
+
+ public batch_mutation_t() {
+ }
+
+ public batch_mutation_t(
+ String table,
+ String key,
+ Map<String,List<column_t>> cfmap,
+ Map<String,List<column_t>> cfmapdel)
+ {
+ this();
+ this.table = table;
+ this.__isset.table = true;
+ this.key = key;
+ this.__isset.key = true;
+ this.cfmap = cfmap;
+ this.__isset.cfmap = true;
+ this.cfmapdel = cfmapdel;
+ this.__isset.cfmapdel = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof batch_mutation_t)
+ return this.equals((batch_mutation_t)that);
+ return false;
+ }
+
+ public boolean equals(batch_mutation_t that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_table = true && (this.table != null);
+ boolean that_present_table = true && (that.table != null);
+ if (this_present_table || that_present_table) {
+ if (!(this_present_table && that_present_table))
+ return false;
+ if (!this.table.equals(that.table))
+ return false;
+ }
+
+ boolean this_present_key = true && (this.key != null);
+ boolean that_present_key = true && (that.key != null);
+ if (this_present_key || that_present_key) {
+ if (!(this_present_key && that_present_key))
+ return false;
+ if (!this.key.equals(that.key))
+ return false;
+ }
+
+ boolean this_present_cfmap = true && (this.cfmap != null);
+ boolean that_present_cfmap = true && (that.cfmap != null);
+ if (this_present_cfmap || that_present_cfmap) {
+ if (!(this_present_cfmap && that_present_cfmap))
+ return false;
+ if (!this.cfmap.equals(that.cfmap))
+ return false;
+ }
+
+ boolean this_present_cfmapdel = true && (this.cfmapdel != null);
+ boolean that_present_cfmapdel = true && (that.cfmapdel != null);
+ if (this_present_cfmapdel || that_present_cfmapdel) {
+ if (!(this_present_cfmapdel && that_present_cfmapdel))
+ return false;
+ if (!this.cfmapdel.equals(that.cfmapdel))
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.STRING) {
+ this.table = iprot.readString();
+ this.__isset.table = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2:
+ if (field.type == TType.STRING) {
+ this.key = iprot.readString();
+ this.__isset.key = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3:
+ if (field.type == TType.MAP) {
+ {
+ TMap _map0 = iprot.readMapBegin();
+ this.cfmap = new HashMap<String,List<column_t>>(2*_map0.size);
+ for (int _i1 = 0; _i1 < _map0.size; ++_i1)
+ {
+ String _key2;
+ List<column_t> _val3;
+ _key2 = iprot.readString();
+ {
+ TList _list4 = iprot.readListBegin();
+ _val3 = new ArrayList<column_t>(_list4.size);
+ for (int _i5 = 0; _i5 < _list4.size; ++_i5)
+ {
+ column_t _elem6 = new column_t();
+ _elem6 = new column_t();
+ _elem6.read(iprot);
+ _val3.add(_elem6);
+ }
+ iprot.readListEnd();
+ }
+ this.cfmap.put(_key2, _val3);
+ }
+ iprot.readMapEnd();
+ }
+ this.__isset.cfmap = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 4:
+ if (field.type == TType.MAP) {
+ {
+ TMap _map7 = iprot.readMapBegin();
+ this.cfmapdel = new HashMap<String,List<column_t>>(2*_map7.size);
+ for (int _i8 = 0; _i8 < _map7.size; ++_i8)
+ {
+ String _key9;
+ List<column_t> _val10;
+ _key9 = iprot.readString();
+ {
+ TList _list11 = iprot.readListBegin();
+ _val10 = new ArrayList<column_t>(_list11.size);
+ for (int _i12 = 0; _i12 < _list11.size; ++_i12)
+ {
+ column_t _elem13 = new column_t();
+ _elem13 = new column_t();
+ _elem13.read(iprot);
+ _val10.add(_elem13);
+ }
+ iprot.readListEnd();
+ }
+ this.cfmapdel.put(_key9, _val10);
+ }
+ iprot.readMapEnd();
+ }
+ this.__isset.cfmapdel = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("batch_mutation_t");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ if (this.table != null) {
+ field.name = "table";
+ field.type = TType.STRING;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.table);
+ oprot.writeFieldEnd();
+ }
+ if (this.key != null) {
+ field.name = "key";
+ field.type = TType.STRING;
+ field.id = 2;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.key);
+ oprot.writeFieldEnd();
+ }
+ if (this.cfmap != null) {
+ field.name = "cfmap";
+ field.type = TType.MAP;
+ field.id = 3;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmap.size()));
+ for (String _iter14 : this.cfmap.keySet()) {
+ oprot.writeString(_iter14);
+ {
+ oprot.writeListBegin(new TList(TType.STRUCT, this.cfmap.get(_iter14).size()));
+ for (column_t _iter15 : this.cfmap.get(_iter14)) {
+ _iter15.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (this.cfmapdel != null) {
+ field.name = "cfmapdel";
+ field.type = TType.MAP;
+ field.id = 4;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST, this.cfmapdel.size()));
+ for (String _iter16 : this.cfmapdel.keySet()) {
+ oprot.writeString(_iter16);
+ {
+ oprot.writeListBegin(new TList(TType.STRUCT, this.cfmapdel.get(_iter16).size()));
+ for (column_t _iter17 : this.cfmapdel.get(_iter16)) {
+ _iter17.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("batch_mutation_t(");
+ sb.append("table:");
+ sb.append(this.table);
+ sb.append(",key:");
+ sb.append(this.key);
+ sb.append(",cfmap:");
+ sb.append(this.cfmap);
+ sb.append(",cfmapdel:");
+ sb.append(this.cfmapdel);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/service/column_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/column_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/column_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/column_t.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,181 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class column_t implements TBase, java.io.Serializable {
+ public String columnName;
+ public String value;
+ public long timestamp;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean columnName = false;
+ public boolean value = false;
+ public boolean timestamp = false;
+ }
+
+ public column_t() {
+ }
+
+ public column_t(
+ String columnName,
+ String value,
+ long timestamp)
+ {
+ this();
+ this.columnName = columnName;
+ this.__isset.columnName = true;
+ this.value = value;
+ this.__isset.value = true;
+ this.timestamp = timestamp;
+ this.__isset.timestamp = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof column_t)
+ return this.equals((column_t)that);
+ return false;
+ }
+
+ public boolean equals(column_t that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_columnName = true && (this.columnName != null);
+ boolean that_present_columnName = true && (that.columnName != null);
+ if (this_present_columnName || that_present_columnName) {
+ if (!(this_present_columnName && that_present_columnName))
+ return false;
+ if (!this.columnName.equals(that.columnName))
+ return false;
+ }
+
+ boolean this_present_value = true && (this.value != null);
+ boolean that_present_value = true && (that.value != null);
+ if (this_present_value || that_present_value) {
+ if (!(this_present_value && that_present_value))
+ return false;
+ if (!this.value.equals(that.value))
+ return false;
+ }
+
+ boolean this_present_timestamp = true;
+ boolean that_present_timestamp = true;
+ if (this_present_timestamp || that_present_timestamp) {
+ if (!(this_present_timestamp && that_present_timestamp))
+ return false;
+ if (this.timestamp != that.timestamp)
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.STRING) {
+ this.columnName = iprot.readString();
+ this.__isset.columnName = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2:
+ if (field.type == TType.STRING) {
+ this.value = iprot.readString();
+ this.__isset.value = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3:
+ if (field.type == TType.I64) {
+ this.timestamp = iprot.readI64();
+ this.__isset.timestamp = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("column_t");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ if (this.columnName != null) {
+ field.name = "columnName";
+ field.type = TType.STRING;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.columnName);
+ oprot.writeFieldEnd();
+ }
+ if (this.value != null) {
+ field.name = "value";
+ field.type = TType.STRING;
+ field.id = 2;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.value);
+ oprot.writeFieldEnd();
+ }
+ field.name = "timestamp";
+ field.type = TType.I64;
+ field.id = 3;
+ oprot.writeFieldBegin(field);
+ oprot.writeI64(this.timestamp);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("column_t(");
+ sb.append("columnName:");
+ sb.append(this.columnName);
+ sb.append(",value:");
+ sb.append(this.value);
+ sb.append(",timestamp:");
+ sb.append(this.timestamp);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,168 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class superColumn_t implements TBase, java.io.Serializable {
+ public String name;
+ public List<column_t> columns;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean name = false;
+ public boolean columns = false;
+ }
+
+ public superColumn_t() {
+ }
+
+ public superColumn_t(
+ String name,
+ List<column_t> columns)
+ {
+ this();
+ this.name = name;
+ this.__isset.name = true;
+ this.columns = columns;
+ this.__isset.columns = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof superColumn_t)
+ return this.equals((superColumn_t)that);
+ return false;
+ }
+
+ public boolean equals(superColumn_t that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_name = true && (this.name != null);
+ boolean that_present_name = true && (that.name != null);
+ if (this_present_name || that_present_name) {
+ if (!(this_present_name && that_present_name))
+ return false;
+ if (!this.name.equals(that.name))
+ return false;
+ }
+
+ boolean this_present_columns = true && (this.columns != null);
+ boolean that_present_columns = true && (that.columns != null);
+ if (this_present_columns || that_present_columns) {
+ if (!(this_present_columns && that_present_columns))
+ return false;
+ if (!this.columns.equals(that.columns))
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.STRING) {
+ this.name = iprot.readString();
+ this.__isset.name = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2:
+ if (field.type == TType.LIST) {
+ {
+ TList _list18 = iprot.readListBegin();
+ this.columns = new ArrayList<column_t>(_list18.size);
+ for (int _i19 = 0; _i19 < _list18.size; ++_i19)
+ {
+ column_t _elem20 = new column_t();
+ _elem20 = new column_t();
+ _elem20.read(iprot);
+ this.columns.add(_elem20);
+ }
+ iprot.readListEnd();
+ }
+ this.__isset.columns = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("superColumn_t");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ if (this.name != null) {
+ field.name = "name";
+ field.type = TType.STRING;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.name);
+ oprot.writeFieldEnd();
+ }
+ if (this.columns != null) {
+ field.name = "columns";
+ field.type = TType.LIST;
+ field.id = 2;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeListBegin(new TList(TType.STRUCT, this.columns.size()));
+ for (column_t _iter21 : this.columns) {
+ _iter21.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("superColumn_t(");
+ sb.append("name:");
+ sb.append(this.name);
+ sb.append(",columns:");
+ sb.append(this.columns);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/DBTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/DBTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/DBTest.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Scanner;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.mapreduce.SequentialScanner;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+
+
+public class DBTest
+{
+ private static void doWrites() throws Throwable
+ {
+ for ( int i = 0; i < 512*1024; ++i )
+ {
+ String key = Integer.toString(i);
+ RowMutation rm = new RowMutation("Mailbox", key);
+ String value = "Data for key " + key;
+ rm.add("Test:" + "Column", value.getBytes(), i);
+ rm.apply();
+ }
+ System.out.println("Write done");
+ }
+
+ private static void doReads() throws Throwable
+ {
+ Table table = Table.open("Mailbox");
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ Row row = table.getRow(key, "Test");
+ System.out.println( row.getColumnFamily("Test") );
+ System.out.println("Row read done");
+ ColumnFamily cf = table.get(key, "Test");
+ if (cf == null)
+ System.out.println("KEY " + key + " is missing");
+ else
+ {
+ Collection<IColumn> superColumns = cf.getAllColumns();
+ System.out.println("Success ...");
+ }
+ }
+ System.out.println("Read done ...");
+ }
+
+ private static void doRead(String key) throws Throwable
+ {
+ Table table = Table.open("Mailbox");
+ Row row = table.getRow(key, "Test");
+ ColumnFamily cf = table.get(key, "Test");
+ if (cf == null)
+ System.out.println("KEY " + key + " is missing");
+ else
+ {
+ Collection<IColumn> columns = cf.getAllColumns();
+ for ( IColumn column : columns )
+ {
+ System.out.println(column.name());
+ System.out.println( new String( column.value() ) );
+ }
+ }
+ System.out.println("Read done ...");
+ }
+
+ private static void doScannerTest() throws Throwable
+ {
+ Scanner scanner = new Scanner("Mailbox");
+ scanner.fetch(Integer.toString(105), "MailboxMailList0");
+
+ while ( scanner.hasNext() )
+ {
+ System.out.println(scanner.next().name());
+ }
+ }
+
+ private static void doSequentialScannerTest() throws Throwable
+ {
+ SequentialScanner scanner = new SequentialScanner("Mailbox");
+ while ( scanner.hasNext() )
+ {
+ Row row = scanner.next();
+ System.out.println( row.getColumnFamily("Test") );
+ System.out.println( row.getColumnFamily("Test2") );
+ }
+ }
+
+ public static void doTest()
+ {
+ String host = "insearch00";
+ String host2 = "insearch0";
+ Set<EndPoint> allNodes = new HashSet<EndPoint>();
+ for ( int i = 1; i <= 3; ++i )
+ {
+ if ( i < 10 )
+ allNodes.add( new EndPoint(host + i + ".sf2p.facebook.com", 7000) );
+ else
+ allNodes.add( new EndPoint(host2 + i + ".sf2p.facebook.com", 7000) );
+ }
+
+ for ( int i = 1; i <= 2; ++i )
+ {
+ if ( i < 10 )
+ allNodes.add( new EndPoint(host + i + ".ash1.facebook.com", 7000) );
+ else
+ allNodes.add( new EndPoint(host2 + i + ".ash1.facebook.com", 7000) );
+ }
+
+ TestChoice t = new TestChoice(allNodes);
+ t.assignReplicas();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ /*
+ SSTable ssTable = new SSTable("C:\\Engagements\\", "Sample-Bf");
+ BloomFilter bf = new BloomFilter(512*1024, 15);
+ for ( int i = 0; i < 512*1024; ++i )
+ {
+ bf.fill( Integer.toString(i) );
+ }
+ ssTable.close(bf);
+ */
+ /*
+ IFileWriter writer = SequenceFile.bufferedWriter("C:\\Engagements\\Sample-Bf-Data.db", 4*1024*1024);
+ BloomFilter bf = new BloomFilter(512*1024, 15);
+ for ( int i = 0; i < 512*1024; ++i )
+ {
+ bf.fill( Integer.toString(i) );
+ }
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ bufOut.close();
+ writer.close(bufOut.getData(), bufOut.getLength());
+ writer.close();
+
+ IFileReader reader = SequenceFile.bufferedReader("C:\\Engagements\\Sample-Bf-Data.db", 4*1024*1024);
+ //DataOutputBuffer bufOut = new DataOutputBuffer();
+ bufOut.reset();
+ reader.next(bufOut);
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ bufIn.readUTF();
+ bufIn.readInt();
+ BloomFilter bf2 = BloomFilter.serializer().deserialize(bufIn);
+ int count = 0;
+ for ( int i = 0; i < 512*1024; ++i )
+ {
+ if ( !bf2.isPresent(Integer.toString(i)) )
+ ++count;
+ }
+ System.out.println(count);
+ reader.close();
+ */
+ //LogUtil.init();
+ //StorageService.instance().start();
+ //doWrites();
+ //doRead("543");
+
+ DatabaseDescriptor.init();
+ DBTest.doTest();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,1631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.THttpClient;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.batch_mutation_super_t;
+import org.apache.cassandra.service.batch_mutation_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DataImporter {
+ private static final String delimiter_ = new String(",");
+
+ private static Logger logger_ = Logger.getLogger(DataImporter.class);
+
+ private static final String tablename_ = new String("Mailbox");
+
+ public static EndPoint from_ = new EndPoint("172.21.211.181", 10001);
+
+ public static EndPoint to_ = new EndPoint("hadoop071.sf2p.facebook.com",
+ 7000);
+
+ public static EndPoint[] tos_ = new EndPoint[]{ new EndPoint("hadoop038.sf2p.facebook.com", 7000),
+ new EndPoint("hadoop039.sf2p.facebook.com", 7000),
+ new EndPoint("hadoop040.sf2p.facebook.com", 7000),
+ new EndPoint("hadoop041.sf2p.facebook.com", 7000)
+ };
+ private static final String columnFamily_ = new String("MailboxUserList");
+
+ private Cassandra.Client peerstorageClient_ = null;
+
+ public void test(String line) throws IOException {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ StringBuilder sb = new StringBuilder("");
+ int i = 0;
+ String column = null;
+ int ts = 0;
+ String columnValue = null;
+
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ sb.append((String) st.nextElement());
+ sb.append(":");
+ break;
+
+ case 1:
+ sb.append((String) st.nextElement());
+ break;
+
+ case 2:
+ column = (String) st.nextElement();
+ break;
+
+ case 3:
+ ts = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ columnValue = (String) st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+
+ String rowKey = sb.toString();
+ try {
+ long t = System.currentTimeMillis();
+ peerstorageClient_.insert(tablename_, rowKey, columnFamily_ + ":"
+ + column, columnValue, ts);
+ logger_.debug("Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ /* Added the thrift call to storage. */
+ }
+
+ private int roundRobin_ = 0 ;
+ private Random random_ = new Random();
+
+ public void apply(batch_mutation_t batchMutation) {
+
+ columnFamilyHack_++;
+ try {
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ long t = System.currentTimeMillis();
+ peerstorageClient_.batch_insert(batchMutation);
+ logger_.debug("Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ try {
+ peerstorageClient_ = connect();
+ peerstorageClient_.batch_insert(batchMutation);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ public void apply(batch_mutation_super_t batchMutation) {
+
+ columnFamilyHack_++;
+ try {
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ long t = System.currentTimeMillis();
+ peerstorageClient_.batch_insert_superColumn(batchMutation);
+ logger_.debug("Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ try {
+ peerstorageClient_ = connect();
+ peerstorageClient_.batch_insert_superColumn(batchMutation);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ TTransport transport_ = null;
+ public Cassandra.Client connect() {
+// String host = "hadoop034.sf2p.facebook.com";
+ String[] hosts = new String[] { "hadoop038.sf2p.facebook.com",
+ "hadoop039.sf2p.facebook.com",
+ "hadoop040.sf2p.facebook.com",
+ "hadoop041.sf2p.facebook.com"
+ };
+ int port = 9160;
+
+ //TNonBlockingSocket socket = new TNonBlockingSocket(hosts[roundRobin_], port);
+ TSocket socket = new TSocket("hadoop071.sf2p.facebook.com", port);
+ roundRobin_ = (roundRobin_+1)%4;
+ if(transport_ != null)
+ transport_.close();
+ transport_ = socket;
+
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport_, false,
+ false);
+ Cassandra.Client peerstorageClient = new Cassandra.Client(
+ binaryProtocol);
+ try
+ {
+ transport_.open();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ return peerstorageClient;
+ }
+
+ private static boolean isNumeric(String str)
+ {
+ try
+ {
+ Integer.parseInt(str);
+ return true;
+ }
+ catch (NumberFormatException nfe)
+ {
+ return false;
+ }
+ }
+
+ int columnFamilyHack_ = 0 ;
+ public int divideby_ = 4;
+
+ public void testBatchRunner(String filepath) throws IOException {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ batch_mutation_t rmInbox = null;
+ batch_mutation_t rmOutbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ int uid =0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+
+ nextuser = user;
+ if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+ firstuser = nextuser;
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+/* fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+*/ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+/* fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+*/ apply(rmOutbox);
+ }
+ rmInbox = new batch_mutation_t();
+ rmInbox.table = "Mailbox";
+ rmInbox.key = firstuser + ":0";
+ rmInbox.cfmap = new HashMap<String, List<column_t>>();
+
+ rmOutbox = new batch_mutation_t();
+ rmOutbox.table = "Mailbox";
+ rmOutbox.key = firstuser + ":1";
+ rmOutbox.cfmap = new HashMap<String, List<column_t>>();
+ }
+ column_t columnData = new column_t();
+ columnData.columnName = threadId;
+ columnData.value = String.valueOf(isDeleted);
+ columnData.timestamp = lastUpdated;
+ // List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+ if (folder == 0) {
+ List<column_t> list = rmInbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<column_t>();
+ rmInbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+ }
+ //if(list.size() < 500)
+ list.add(columnData);
+ } else {
+ List<column_t> list = rmOutbox.cfmap
+ .get("MailboxUserList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<column_t>();
+ rmOutbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+ }
+ //if(list.size() < 500)
+ list.add(columnData);
+ }
+ }
+ if (firstuser != null) {
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+/* fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+*/ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+/* fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+*/ apply(rmOutbox);
+ }
+ }
+ /* Added the thrift call to storage. */
+ }
+
+
+ public void testMailboxBatchRunner(String filepath) throws IOException {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ batch_mutation_t rmInbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ int uid =0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+
+ nextuser = user;
+ if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+ firstuser = nextuser;
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ apply(rmInbox);
+ }
+ rmInbox = new batch_mutation_t();
+ rmInbox.table = "Mailbox";
+ rmInbox.key = firstuser;
+ rmInbox.cfmap = new HashMap<String, List<column_t>>();
+ }
+ column_t columnData = new column_t();
+ columnData.columnName = threadId;
+ columnData.value = String.valueOf(isDeleted);
+ columnData.timestamp = lastUpdated;
+ List<column_t> list = rmInbox.cfmap.get("MailboxMailList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<column_t>();
+ rmInbox.cfmap.put("MailboxMailList"+(columnFamilyHack_%divideby_), list);
+ }
+ list.add(columnData);
+ }
+ if (firstuser != null) {
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ apply(rmInbox);
+ }
+ }
+ }
+
+ public void testSuperBatchRunner(String filepath) throws IOException {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ batch_mutation_super_t rmInbox = null;
+ batch_mutation_super_t rmOutbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ int uid =0;
+ String user = null;
+ String subject = null;
+ String body = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ case 5:
+ st.nextElement();
+ break;
+
+ case 6:
+ st.nextElement();
+ break;
+
+ case 7:
+ subject = (String) st.nextElement();
+ break;
+
+ case 8:
+ body = (String) st.nextElement();
+ break;
+ default:
+ st.nextElement();
+ break;
+ }
+ ++i;
+ }
+
+ nextuser = user;
+ if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+ firstuser = nextuser;
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+ fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmOutbox);
+ }
+ rmInbox = new batch_mutation_super_t();
+ rmInbox.table = "Mailbox";
+ rmInbox.key = firstuser ;//+ ":0";
+ rmInbox.cfmap = new HashMap<String, List<superColumn_t>>();
+
+ rmOutbox = new batch_mutation_super_t();
+ rmOutbox.table = "Mailbox";
+ rmOutbox.key = firstuser ;//+ ":1";
+ rmOutbox.cfmap = new HashMap<String, List<superColumn_t>>();
+ }
+ column_t columnData = new column_t();
+ columnData.columnName = threadId;
+ columnData.value = String.valueOf(isDeleted);
+ columnData.timestamp = lastUpdated;
+ // List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+ if (folder == 0) {
+ List<superColumn_t> list = rmInbox.cfmap.get("MailboxThreadList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<superColumn_t>();
+ rmInbox.cfmap.put("MailboxThreadList"+(columnFamilyHack_%divideby_), list);
+ }
+ if( subject == null)
+ subject = "";
+ if( body == null )
+ body = "";
+ List<String> tokenList = tokenize(subject + " " + body);
+ for(String token : tokenList)
+ {
+ superColumn_t superColumn = new superColumn_t();
+ superColumn.name = token;
+ superColumn.columns = new ArrayList<column_t>();
+ superColumn.columns.add(columnData);
+ list.add(superColumn);
+ }
+ } else {
+ List<superColumn_t> list = rmOutbox.cfmap.get("MailboxThreadList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<superColumn_t>();
+ rmOutbox.cfmap.put("MailboxThreadList"+(columnFamilyHack_%divideby_), list);
+ }
+ if( subject == null)
+ subject = "";
+ if( body == null )
+ body = "";
+ List<String> tokenList = tokenize(subject + " " + body);
+ for(String token : tokenList)
+ {
+ superColumn_t superColumn = new superColumn_t();
+ superColumn.name = token;
+ superColumn.columns = new ArrayList<column_t>();
+ superColumn.columns.add(columnData);
+ list.add(superColumn);
+ }
+ }
+ }
+ if (firstuser != null) {
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+ fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmOutbox);
+ }
+ }
+ /* Added the thrift call to storage. */
+ }
+
+ public static String[] getTokens(String str, String delim)
+ {
+ StringTokenizer st = new StringTokenizer(str, delim);
+ String[] values = new String[st.countTokens()];
+ int i = 0;
+ while ( st.hasMoreElements() )
+ {
+ values[i++] = (String)st.nextElement();
+ }
+ return values;
+ }
+
+ public static boolean checkUser(String user, String[] list)
+ {
+ boolean bFound = false;
+ for(String l:list)
+ {
+ if(user.equals(l))
+ {
+ bFound = true;
+ }
+ }
+ return bFound;
+ }
+
+ public void testSuperUserBatchRunner(String filepath) throws IOException {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ batch_mutation_super_t rmInbox = null;
+ batch_mutation_super_t rmOutbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ int uid =0;
+ String user = null;
+ String subject = null;
+ String body = null;
+ String authors = null;
+ String participants = null;
+
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ case 5:
+ authors = (String) st.nextElement();
+ break;
+
+ case 6:
+ participants = (String)st.nextElement();
+ break;
+
+ case 7:
+ subject = (String) st.nextElement();
+ break;
+
+ case 8:
+ body = (String) st.nextElement();
+ break;
+ default:
+ st.nextElement();
+ break;
+ }
+ ++i;
+ }
+
+ nextuser = user;
+ if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+ firstuser = nextuser;
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+ fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmOutbox);
+ }
+ rmInbox = new batch_mutation_super_t();
+ rmInbox.table = "Mailbox";
+ rmInbox.key = firstuser ;//+ ":0";
+ rmInbox.cfmap = new HashMap<String, List<superColumn_t>>();
+
+ rmOutbox = new batch_mutation_super_t();
+ rmOutbox.table = "Mailbox";
+ rmOutbox.key = firstuser ;//+ ":1";
+ rmOutbox.cfmap = new HashMap<String, List<superColumn_t>>();
+ }
+ column_t columnData = new column_t();
+ columnData.columnName = threadId;
+ columnData.value = String.valueOf(isDeleted);
+ columnData.timestamp = lastUpdated;
+ // List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+ if (folder == 0) {
+ List<superColumn_t> list = rmInbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<superColumn_t>();
+ rmInbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+ }
+ if( authors == null)
+ authors = "";
+ if( participants == null )
+ participants = "";
+ String[] authorList = getTokens(authors,":");
+ String[] partList = getTokens(participants,":");
+ String[] tokenList = null;
+ if(checkUser(user,authorList))
+ {
+ tokenList = partList;
+ }
+ else
+ {
+ tokenList = authorList;
+ }
+
+ for(String token : tokenList)
+ {
+ superColumn_t superColumn = new superColumn_t();
+ superColumn.name = token;
+ superColumn.columns = new ArrayList<column_t>();
+ superColumn.columns.add(columnData);
+ list.add(superColumn);
+ }
+ } else {
+ List<superColumn_t> list = rmOutbox.cfmap.get("MailboxUserList"+(columnFamilyHack_%divideby_));
+ if (list == null) {
+ list = new ArrayList<superColumn_t>();
+ rmOutbox.cfmap.put("MailboxUserList"+(columnFamilyHack_%divideby_), list);
+ }
+ if( authors == null)
+ authors = "";
+ if( participants == null )
+ participants = "";
+ String[] authorList = getTokens(authors,":");
+ String[] partList = getTokens(participants,":");
+ String[] tokenList = null;
+ if(checkUser(user,authorList))
+ {
+ tokenList = partList;
+ }
+ else
+ {
+ tokenList = authorList;
+ }
+ for(String token : tokenList)
+ {
+ superColumn_t superColumn = new superColumn_t();
+ superColumn.name = token;
+ superColumn.columns = new ArrayList<column_t>();
+ superColumn.columns.add(columnData);
+ list.add(superColumn);
+ }
+ }
+ }
+ if (firstuser != null) {
+ if (rmInbox != null && !rmInbox.cfmap.isEmpty()) {
+ fos_.write(rmInbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmInbox);
+ }
+ if (rmOutbox != null && !rmOutbox.cfmap.isEmpty()) {
+ fos_.write(rmOutbox.key.getBytes());
+ fos_.write( System.getProperty("line.separator").getBytes());
+ counter_.incrementAndGet();
+ apply(rmOutbox);
+ }
+ }
+ /* Added the thrift call to storage. */
+ }
+
+
+
+ // Defining these privates here as they make more snese with the functions
+ // below
+ // Sorry
+
+ private int numCreated_ = 0;
+
+ ThreadFactory tf_ = null;
+
+ ScheduledExecutorService pool_ = null;
+
+ private int requestsPerSecond_ = 50;
+ public FileOutputStream fos_ = null;
+ private AtomicInteger counter_ = new AtomicInteger(0);
+
+ // This is the task that gets scheduled
+ // This could be different for different kind of tasks
+ class Task implements Runnable {
+ RowMutationMessage rmMsg_ = null;
+
+ public Task(RowMutationMessage rmMsg) {
+ rmMsg_ = rmMsg;
+ }
+
+ public void run() {
+ try {
+ long t = System.currentTimeMillis();
+ counter_.incrementAndGet();
+ Message message = new Message(DataImporter.from_,
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[] { rmMsg_ });
+ MessagingService.getMessagingInstance().sendOneWay(message,
+ DataImporter.to_);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public DataImporter() throws Throwable {
+ tf_ = new ThreadFactoryImpl("LOAD-GENERATOR");
+ pool_ = new DebuggableScheduledThreadPoolExecutor(100, tf_);
+ fos_ = new FileOutputStream("keys.dat", true);
+ }
+
+ public long errorCount_ = 0;
+
+ public long queryCount_ = 0;
+
+ public void testRead(String filepath) throws Throwable {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutationMessage rmInbox = null;
+ RowMutationMessage rmOutbox = null;
+ ColumnFamily cfInbox = null;
+ ColumnFamily cfOutbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+ String key = null;
+ if (folder == 0) {
+ key = user + ":0";
+ } else {
+ key = user + ":1";
+ }
+
+ ReadMessage readMessage = new ReadMessage(tablename_, key);
+ Message message = new Message(from_, StorageService.readStage_,
+ StorageService.readVerbHandler_,
+ new Object[] { readMessage });
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(
+ message, to_);
+ Object[] result = iar.get();
+ ReadResponseMessage readResponseMessage = (ReadResponseMessage) result[0];
+ Row row = readResponseMessage.row();
+ if (row == null) {
+ logger_.debug("ERROR No row for this key .....: " + line);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ errorCount_++;
+ } else {
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0) {
+ logger_
+ .debug("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ System.out
+ .println("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ errorCount_++;
+ continue;
+ }
+ ColumnFamily cfamily = cfMap.get(columnFamily_);
+ if (cfamily == null) {
+ logger_
+ .debug("ERROR ColumnFamily is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ System.out
+ .println("ERROR ColumnFamily is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ errorCount_++;
+ continue;
+ }
+
+ IColumn clmn = cfamily.getColumn(threadId);
+ queryCount_++;
+ if (clmn == null) {
+ logger_.debug("ERROR Column is missing.....: " + threadId
+ + " record:" + line);
+ System.out.println("ERROR Column is missing.....: "
+ + threadId + " record:" + line);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ errorCount_++;
+ } else {
+ // logger_.debug("SUCCESS .....for column : "+clmn.name()+"
+ // Record:" + line);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ }
+ }
+
+ }
+
+ }
+
+ private List<String> tokenize(String string)
+ {
+ List<String> stringList = new ArrayList<String>();
+ Analyzer analyzer = new StandardAnalyzer();
+ TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(string));
+ Token token = null;
+ try
+ {
+ token = ts.next();
+ while(token != null)
+ {
+ stringList.add(token.termText());
+ token = ts.next();
+ }
+ }
+ catch(IOException ex)
+ {
+ ex.printStackTrace();
+ }
+
+ return stringList;
+ }
+ private int numReqs_ = 0;
+ private long totalTime_ = 0 ;
+
+ public void testReadThrift(String filepath) throws Throwable {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutationMessage rmInbox = null;
+ RowMutationMessage rmOutbox = null;
+ ColumnFamily cfInbox = null;
+ ColumnFamily cfOutbox = null;
+ String firstuser = null ;
+ String nextuser = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+ String key = null;
+ if (folder == 0) {
+ key = user + ":0";
+ } else {
+ key = user + ":1";
+ }
+
+ nextuser = key;
+ if(firstuser == null || firstuser.compareTo(nextuser) != 0)
+ {
+ List<column_t> columns = null;
+ firstuser = key;
+ try {
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ long t = System.currentTimeMillis();
+
+ columns = peerstorageClient_.get_slice(tablename_,key,columnFamily_+(columnFamilyHack_%divideby_),0,10);
+ numReqs_++;
+ totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+ logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+ " Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if (columns == null) {
+ logger_.debug("ERROR No row for this key .....: " + line);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ errorCount_++;
+ } else {
+ if (columns.size() == 0) {
+ logger_
+ .debug("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ System.out
+ .println("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " record:" + line);
+ errorCount_++;
+ continue;
+ }
+ else {
+ //logger_.debug("SUCCESS .....for key : "+key);
+ //System.out.println("SUCCESS .....for key : "+key);
+ //for(int j = 0 ; j< columns.size() ; j++ ){
+ //System.out.print(" " + columns.get(j)+",");
+ //}
+ // Record:" + line);
+ //Thread.sleep(5);
+ }
+ }
+ queryCount_++;
+ }
+ }
+
+ }
+
+
+ public void testSuperReadThrift(String filepath) throws Throwable {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutationMessage rmInbox = null;
+ RowMutationMessage rmOutbox = null;
+ ColumnFamily cfInbox = null;
+ ColumnFamily cfOutbox = null;
+ String firstuser = null ;
+ String nextuser = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ String user = null;
+ String subject = null;
+ String body = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ case 5:
+ st.nextElement();
+ break;
+
+
+ case 6:
+ st.nextElement();
+ break;
+
+ case 7:
+ subject = (String) st.nextElement();
+ break;
+
+ case 8:
+ body = (String) st.nextElement();
+ break;
+ default:
+ st.nextElement();
+ break;
+ }
+ ++i;
+ }
+ String key = null;
+ if (folder == 0) {
+ key = user ;//+ ":0";
+ } else {
+ key = user ;//+ ":1";
+ }
+
+ List<column_t> columns = null;
+ firstuser = key;
+ try {
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ if( subject == null )
+ subject = "";
+ if( body == null )
+ body = "";
+ List<String> tokenList = tokenize(subject + " " + body ) ;
+
+ for( String token: tokenList )
+ {
+ long t = System.currentTimeMillis();
+ columns = peerstorageClient_.get_slice(tablename_,key,"MailboxThreadList"+(columnFamilyHack_%divideby_)+":"+token,0,10);
+ totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+ numReqs_++;
+ logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+ " Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ if (columns == null) {
+ logger_.debug(" TOKEN: " + token + " ERROR No row for this key .....: " + line);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ errorCount_++;
+ } else {
+ if (columns.size() == 0) {
+ logger_
+ .debug("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " TOKEN: " + token
+ + " record:" + line);
+ System.out
+ .println("ERROR ColumnFamil map is missing.....: "
+ + threadId + " key:" + key
+ + " TOKEN: " + token
+ + " record:" + line);
+ errorCount_++;
+ continue;
+ }
+ else {
+ boolean found = false;
+ for(column_t column : columns)
+ {
+ if(column.columnName.equalsIgnoreCase(threadId))
+ {
+ found = true ;
+ break;
+ }
+ }
+ if(!found)
+ {
+ logger_
+ .debug("ERROR column is missing.....: "
+ + threadId + " key:" + key
+ + " TOKEN: " + token
+ + " record:" + line);
+ System.out
+ .println("ERROR column is missing.....: "
+ + threadId + " key:" + key
+ + " TOKEN: " + token
+ + " record:" + line);
+ errorCount_++;
+
+ }
+ //logger_.debug("SUCCESS .....for key : "+key);
+ //System.out.println("SUCCESS .....for key : "+key);
+ //for(int j = 0 ; j< columns.size() ; j++ ){
+ //System.out.print(" " + columns.get(j)+",");
+ //}
+ // Record:" + line);
+ //Thread.sleep(5);
+ }
+ }
+ }
+ queryCount_++;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+
+
+
+ public void testLoadGeneratorBatchRunner(String filepath) throws Throwable
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ RowMutation rmInbox = null;
+ RowMutation rmOutbox = null;
+ ColumnFamily cfInbox = null;
+ ColumnFamily cfOutbox = null;
+ while ((line = bufReader.readLine()) != null)
+ {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ String user = null;
+ while (st.hasMoreElements())
+ {
+ switch (i)
+ {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+
+ nextuser = user;
+ if (firstuser == null || firstuser.compareTo(nextuser) != 0) {
+ firstuser = nextuser;
+ if (rmInbox != null) {
+ applyLoad(rmInbox);
+ }
+ if (rmOutbox != null) {
+ applyLoad(rmOutbox);
+ }
+ rmInbox = new RowMutation(tablename_, firstuser + ":0");
+ rmOutbox = new RowMutation(tablename_, firstuser + ":1");
+ }
+ // List <MboxStruct> list = userthreadmap.get(rs.getString(1));
+ if (folder == 0) {
+ rmInbox.add(columnFamily_+(columnFamilyHack_%divideby_)+":"+threadId, String.valueOf(isDeleted).getBytes(), lastUpdated);
+ } else {
+ rmOutbox.add(columnFamily_+(columnFamilyHack_%divideby_)+":"+threadId,String.valueOf(isDeleted).getBytes(),lastUpdated);
+ }
+ }
+ if (firstuser != null) {
+ if (rmInbox != null) {
+ applyLoad(rmInbox);
+ }
+ if (rmOutbox != null) {
+ applyLoad(rmOutbox);
+ }
+ }
+
+
+
+
+ }
+
+ /*
+ * This function will apply the given task . It is based on a requests per
+ * second member variable which can be set to teh required ammount , it will
+ * generate only those many requests and if thos emany requests have already
+ * been entered then it will sleep . This function assumes that there is no
+ * waiting in any other part of the code so the requests are being generated
+ * instantaniously .
+ */
+ public void applyLoad(RowMutation rm) throws IOException {
+ try
+ {
+ long t = System.currentTimeMillis();
+ counter_.incrementAndGet();
+ columnFamilyHack_++;
+ EndPoint to = new EndPoint(7000);
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ Message message = new Message(to,
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[]{ rmMsg }
+ );
+ MessagingService.getMessagingInstance().sendRR(message, to);
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+
+
+ public void testRemove(String filepath) throws Throwable {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutationMessage rmInbox = null;
+ RowMutationMessage rmOutbox = null;
+ ColumnFamily cfInbox = null;
+ ColumnFamily cfOutbox = null;
+ String firstuser = null ;
+ String nextuser = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+ String key = null;
+ if (folder == 0) {
+ key = user + ":0";
+ } else {
+ key = user + ":1";
+ }
+
+ nextuser = key;
+ if(firstuser == null || firstuser.compareTo(nextuser) != 0)
+ {
+ ArrayList<column_t> columns = null;
+ firstuser = key;
+ try {
+ Thread.sleep(1000/requestsPerSecond_, 1000%requestsPerSecond_);
+ long t = System.currentTimeMillis();
+
+ peerstorageClient_.remove(tablename_,key,(columnFamilyHack_%divideby_)+":"+threadId);
+ numReqs_++;
+ totalTime_ = totalTime_ + (System.currentTimeMillis() - t);
+ logger_.debug("Numreqs:" + numReqs_ + " Average: " + totalTime_/numReqs_+ " Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ public void run(String[] args) throws Throwable
+ {
+ if (args[0].compareTo("-testWriteMailbox") == 0 ||
+ args[0].compareTo("-testSuperUserWrite") == 0 ||
+ args[0].compareTo("-testPhp") == 0 ||
+ args[0].compareTo("-testWrite") == 0 ||
+ args[0].compareTo("-testRead") == 0 ||
+ args[0].compareTo("-testWriteSuper") == 0 ||
+ args[0].compareTo("-testReadSuper") == 0 ||
+ args[0].compareTo("-testRemove") == 0
+ )
+ {
+ int totalNumofServers = 5; // total number of servers we want to
+ if (args.length > 4) {
+ totalNumofServers = Integer.parseInt((String) args[4]);
+ } // run on
+ if (args.length > 5) {
+ divideby_ = Integer.parseInt((String) args[5]);
+ } // run on
+ int index = Integer.parseInt(args[1]);
+ int fileCount = 0; // need to get the file count
+ int start = 0;
+ int end = 0;
+ File file = new File(args[2]);
+ fileCount = file.list().length;
+ if (index == -1) {
+ start = 0;
+ end = fileCount;
+ } else {
+ int skip = fileCount / totalNumofServers;
+ if (fileCount != 0 && skip == 0) {
+ skip = 1;
+ }
+ start = skip * index;
+ if (index == totalNumofServers - 1) {
+ end = fileCount;
+ } else {
+ end = start + skip;
+ }
+ }
+ if (args.length > 3) {
+ requestsPerSecond_ = Integer.parseInt((String) args[3]);
+ }
+ long t = System.currentTimeMillis();
+ peerstorageClient_ = connect();
+ for (int i = start; i < end; i++) {
+ String fileName = file.list()[i];
+ if ( args[0].compareTo("-testRead") == 0 )
+ {
+ testReadThrift(args[2] + System.getProperty("file.separator")
+ + fileName);
+ }
+ if ( args[0].compareTo("-testWrite") == 0 )
+ {
+ testBatchRunner(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testWriteSuper") == 0 )
+ {
+ testSuperBatchRunner(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testReadSuper") == 0 )
+ {
+ testSuperReadThrift(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testRemove") == 0 )
+ {
+ testRemove(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testPhp") == 0 )
+ {
+ testPhp(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testSuperUserWrite") == 0 )
+ {
+ testSuperUserBatchRunner(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if ( args[0].compareTo("-testWriteMailbox") == 0 )
+ {
+ testMailboxBatchRunner(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ System.out.println(args[2]
+ + System.getProperty("file.separator") + fileName);
+ }
+ if(transport_ != null)
+ transport_.close();
+ System.out.println("start :" + start + " end : " + end);
+ System.out.println("Time taken .."
+ + (System.currentTimeMillis() - t));
+ System.out.println("Keys sent over: " + counter_.get());
+ fos_.close();
+ return;
+ }
+ else
+ {
+ System.out.println("Invalid option");
+ }
+ }
+
+
+
+ public void testPhp(String filepath) throws IOException {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ String firstuser = null;
+ String nextuser = null;
+ batch_mutation_t rmInbox = null;
+ batch_mutation_t rmOutbox = null;
+ while ((line = bufReader.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ int i = 0;
+ String threadId = null;
+ int lastUpdated = 0;
+ int isDeleted = 0;
+ int folder = 0;
+ int uid =0;
+ String user = null;
+ while (st.hasMoreElements()) {
+ switch (i) {
+ case 0:
+ user = (String) st.nextElement();// sb.append((String)st.nextElement());
+ if ( !isNumeric(user))
+ continue;
+
+ break;
+
+ case 1:
+ folder = Integer.parseInt((String) st.nextElement());// sb.append((String)st.nextElement());
+ break;
+
+ case 2:
+ threadId = (String) st.nextElement();
+ break;
+
+ case 3:
+ lastUpdated = Integer.parseInt((String) st.nextElement());
+ break;
+
+ case 4:
+ isDeleted = Integer.parseInt((String) st.nextElement());// (String)st.nextElement();
+ break;
+
+ default:
+ break;
+ }
+ ++i;
+ }
+ String cmd = "php /home/pmalik/www/scripts/mbox_index/search_test.php "
+ + (new File(filepath)).getName() + " " + user +" " +threadId + " "+ line;
+ Process process = Runtime.getRuntime().exec(cmd);
+
+ }
+ }
+ class PhpExecute implements Runnable
+ {
+ private String cmdLine_;
+
+ PhpExecute(String cmdLine)
+ {
+ cmdLine_ = cmdLine;
+ }
+
+ public void run()
+ {
+ try
+ {
+ System.out.println(cmdLine_);
+ Process process = Runtime.getRuntime().exec(cmdLine_);
+ try
+ {
+ //process.waitFor();
+ }
+ catch ( Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+}