You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/15 21:10:26 UTC
[1/4] storm git commit: STORM-765: Thrift serialization for local
state.
Repository: storm
Updated Branches:
refs/heads/master 2a45a9a5b -> a7c83108a
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LocalAssignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LocalAssignment.java b/storm-core/src/jvm/backtype/storm/generated/LocalAssignment.java
new file mode 100644
index 0000000..3c34891
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LocalAssignment.java
@@ -0,0 +1,561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment, LocalAssignment._Fields>, java.io.Serializable, Cloneable, Comparable<LocalAssignment> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LocalAssignment");
+
+ private static final org.apache.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift.protocol.TField("executors", org.apache.thrift.protocol.TType.LIST, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LocalAssignmentStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LocalAssignmentTupleSchemeFactory());
+ }
+
+ private String topology_id; // required
+ private List<ExecutorInfo> executors; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TOPOLOGY_ID((short)1, "topology_id"),
+ EXECUTORS((short)2, "executors");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TOPOLOGY_ID
+ return TOPOLOGY_ID;
+ case 2: // EXECUTORS
+ return EXECUTORS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TOPOLOGY_ID, new org.apache.thrift.meta_data.FieldMetaData("topology_id", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.EXECUTORS, new org.apache.thrift.meta_data.FieldMetaData("executors", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorInfo.class))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class, metaDataMap);
+ }
+
+ public LocalAssignment() {
+ }
+
+ public LocalAssignment(
+ String topology_id,
+ List<ExecutorInfo> executors)
+ {
+ this();
+ this.topology_id = topology_id;
+ this.executors = executors;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LocalAssignment(LocalAssignment other) {
+ if (other.is_set_topology_id()) {
+ this.topology_id = other.topology_id;
+ }
+ if (other.is_set_executors()) {
+ List<ExecutorInfo> __this__executors = new ArrayList<ExecutorInfo>(other.executors.size());
+ for (ExecutorInfo other_element : other.executors) {
+ __this__executors.add(new ExecutorInfo(other_element));
+ }
+ this.executors = __this__executors;
+ }
+ }
+
+ public LocalAssignment deepCopy() {
+ return new LocalAssignment(this);
+ }
+
+ @Override
+ public void clear() {
+ this.topology_id = null;
+ this.executors = null;
+ }
+
+ public String get_topology_id() {
+ return this.topology_id;
+ }
+
+ public void set_topology_id(String topology_id) {
+ this.topology_id = topology_id;
+ }
+
+ public void unset_topology_id() {
+ this.topology_id = null;
+ }
+
+ /** Returns true if field topology_id is set (has been assigned a value) and false otherwise */
+ public boolean is_set_topology_id() {
+ return this.topology_id != null;
+ }
+
+ public void set_topology_id_isSet(boolean value) {
+ if (!value) {
+ this.topology_id = null;
+ }
+ }
+
+ public int get_executors_size() {
+ return (this.executors == null) ? 0 : this.executors.size();
+ }
+
+ public java.util.Iterator<ExecutorInfo> get_executors_iterator() {
+ return (this.executors == null) ? null : this.executors.iterator();
+ }
+
+ public void add_to_executors(ExecutorInfo elem) {
+ if (this.executors == null) {
+ this.executors = new ArrayList<ExecutorInfo>();
+ }
+ this.executors.add(elem);
+ }
+
+ public List<ExecutorInfo> get_executors() {
+ return this.executors;
+ }
+
+ public void set_executors(List<ExecutorInfo> executors) {
+ this.executors = executors;
+ }
+
+ public void unset_executors() {
+ this.executors = null;
+ }
+
+ /** Returns true if field executors is set (has been assigned a value) and false otherwise */
+ public boolean is_set_executors() {
+ return this.executors != null;
+ }
+
+ public void set_executors_isSet(boolean value) {
+ if (!value) {
+ this.executors = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TOPOLOGY_ID:
+ if (value == null) {
+ unset_topology_id();
+ } else {
+ set_topology_id((String)value);
+ }
+ break;
+
+ case EXECUTORS:
+ if (value == null) {
+ unset_executors();
+ } else {
+ set_executors((List<ExecutorInfo>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TOPOLOGY_ID:
+ return get_topology_id();
+
+ case EXECUTORS:
+ return get_executors();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TOPOLOGY_ID:
+ return is_set_topology_id();
+ case EXECUTORS:
+ return is_set_executors();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LocalAssignment)
+ return this.equals((LocalAssignment)that);
+ return false;
+ }
+
+ public boolean equals(LocalAssignment that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_topology_id = true && this.is_set_topology_id();
+ boolean that_present_topology_id = true && that.is_set_topology_id();
+ if (this_present_topology_id || that_present_topology_id) {
+ if (!(this_present_topology_id && that_present_topology_id))
+ return false;
+ if (!this.topology_id.equals(that.topology_id))
+ return false;
+ }
+
+ boolean this_present_executors = true && this.is_set_executors();
+ boolean that_present_executors = true && that.is_set_executors();
+ if (this_present_executors || that_present_executors) {
+ if (!(this_present_executors && that_present_executors))
+ return false;
+ if (!this.executors.equals(that.executors))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_topology_id = true && (is_set_topology_id());
+ list.add(present_topology_id);
+ if (present_topology_id)
+ list.add(topology_id);
+
+ boolean present_executors = true && (is_set_executors());
+ list.add(present_executors);
+ if (present_executors)
+ list.add(executors);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LocalAssignment other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_topology_id()).compareTo(other.is_set_topology_id());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_topology_id()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topology_id, other.topology_id);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_executors()).compareTo(other.is_set_executors());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_executors()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executors, other.executors);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LocalAssignment(");
+ boolean first = true;
+
+ sb.append("topology_id:");
+ if (this.topology_id == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.topology_id);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("executors:");
+ if (this.executors == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.executors);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_topology_id()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'topology_id' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_executors()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'executors' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LocalAssignmentStandardSchemeFactory implements SchemeFactory {
+ public LocalAssignmentStandardScheme getScheme() {
+ return new LocalAssignmentStandardScheme();
+ }
+ }
+
+ private static class LocalAssignmentStandardScheme extends StandardScheme<LocalAssignment> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LocalAssignment struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TOPOLOGY_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.topology_id = iprot.readString();
+ struct.set_topology_id_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // EXECUTORS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list446 = iprot.readListBegin();
+ struct.executors = new ArrayList<ExecutorInfo>(_list446.size);
+ ExecutorInfo _elem447;
+ for (int _i448 = 0; _i448 < _list446.size; ++_i448)
+ {
+ _elem447 = new ExecutorInfo();
+ _elem447.read(iprot);
+ struct.executors.add(_elem447);
+ }
+ iprot.readListEnd();
+ }
+ struct.set_executors_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LocalAssignment struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.topology_id != null) {
+ oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC);
+ oprot.writeString(struct.topology_id);
+ oprot.writeFieldEnd();
+ }
+ if (struct.executors != null) {
+ oprot.writeFieldBegin(EXECUTORS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.executors.size()));
+ for (ExecutorInfo _iter449 : struct.executors)
+ {
+ _iter449.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LocalAssignmentTupleSchemeFactory implements SchemeFactory {
+ public LocalAssignmentTupleScheme getScheme() {
+ return new LocalAssignmentTupleScheme();
+ }
+ }
+
+ private static class LocalAssignmentTupleScheme extends TupleScheme<LocalAssignment> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LocalAssignment struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.topology_id);
+ {
+ oprot.writeI32(struct.executors.size());
+ for (ExecutorInfo _iter450 : struct.executors)
+ {
+ _iter450.write(oprot);
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LocalAssignment struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.topology_id = iprot.readString();
+ struct.set_topology_id_isSet(true);
+ {
+ org.apache.thrift.protocol.TList _list451 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executors = new ArrayList<ExecutorInfo>(_list451.size);
+ ExecutorInfo _elem452;
+ for (int _i453 = 0; _i453 < _list451.size; ++_i453)
+ {
+ _elem452 = new ExecutorInfo();
+ _elem452.read(iprot);
+ struct.executors.add(_elem452);
+ }
+ }
+ struct.set_executors_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LocalStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LocalStateData.java b/storm-core/src/jvm/backtype/storm/generated/LocalStateData.java
new file mode 100644
index 0000000..448711d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LocalStateData.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LocalStateData implements org.apache.thrift.TBase<LocalStateData, LocalStateData._Fields>, java.io.Serializable, Cloneable, Comparable<LocalStateData> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LocalStateData");
+
+ private static final org.apache.thrift.protocol.TField SERIALIZED_PARTS_FIELD_DESC = new org.apache.thrift.protocol.TField("serialized_parts", org.apache.thrift.protocol.TType.MAP, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LocalStateDataStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LocalStateDataTupleSchemeFactory());
+ }
+
+ private Map<String,ThriftSerializedObject> serialized_parts; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ SERIALIZED_PARTS((short)1, "serialized_parts");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // SERIALIZED_PARTS
+ return SERIALIZED_PARTS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SERIALIZED_PARTS, new org.apache.thrift.meta_data.FieldMetaData("serialized_parts", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ThriftSerializedObject.class))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalStateData.class, metaDataMap);
+ }
+
+ public LocalStateData() {
+ }
+
+ public LocalStateData(
+ Map<String,ThriftSerializedObject> serialized_parts)
+ {
+ this();
+ this.serialized_parts = serialized_parts;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LocalStateData(LocalStateData other) {
+ if (other.is_set_serialized_parts()) {
+ Map<String,ThriftSerializedObject> __this__serialized_parts = new HashMap<String,ThriftSerializedObject>(other.serialized_parts.size());
+ for (Map.Entry<String, ThriftSerializedObject> other_element : other.serialized_parts.entrySet()) {
+
+ String other_element_key = other_element.getKey();
+ ThriftSerializedObject other_element_value = other_element.getValue();
+
+ String __this__serialized_parts_copy_key = other_element_key;
+
+ ThriftSerializedObject __this__serialized_parts_copy_value = new ThriftSerializedObject(other_element_value);
+
+ __this__serialized_parts.put(__this__serialized_parts_copy_key, __this__serialized_parts_copy_value);
+ }
+ this.serialized_parts = __this__serialized_parts;
+ }
+ }
+
+ public LocalStateData deepCopy() {
+ return new LocalStateData(this);
+ }
+
+ @Override
+ public void clear() {
+ this.serialized_parts = null;
+ }
+
+ public int get_serialized_parts_size() {
+ return (this.serialized_parts == null) ? 0 : this.serialized_parts.size();
+ }
+
+ public void put_to_serialized_parts(String key, ThriftSerializedObject val) {
+ if (this.serialized_parts == null) {
+ this.serialized_parts = new HashMap<String,ThriftSerializedObject>();
+ }
+ this.serialized_parts.put(key, val);
+ }
+
+ public Map<String,ThriftSerializedObject> get_serialized_parts() {
+ return this.serialized_parts;
+ }
+
+ public void set_serialized_parts(Map<String,ThriftSerializedObject> serialized_parts) {
+ this.serialized_parts = serialized_parts;
+ }
+
+ public void unset_serialized_parts() {
+ this.serialized_parts = null;
+ }
+
+ /** Returns true if field serialized_parts is set (has been assigned a value) and false otherwise */
+ public boolean is_set_serialized_parts() {
+ return this.serialized_parts != null;
+ }
+
+ public void set_serialized_parts_isSet(boolean value) {
+ if (!value) {
+ this.serialized_parts = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SERIALIZED_PARTS:
+ if (value == null) {
+ unset_serialized_parts();
+ } else {
+ set_serialized_parts((Map<String,ThriftSerializedObject>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SERIALIZED_PARTS:
+ return get_serialized_parts();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case SERIALIZED_PARTS:
+ return is_set_serialized_parts();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LocalStateData)
+ return this.equals((LocalStateData)that);
+ return false;
+ }
+
+ public boolean equals(LocalStateData that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_serialized_parts = true && this.is_set_serialized_parts();
+ boolean that_present_serialized_parts = true && that.is_set_serialized_parts();
+ if (this_present_serialized_parts || that_present_serialized_parts) {
+ if (!(this_present_serialized_parts && that_present_serialized_parts))
+ return false;
+ if (!this.serialized_parts.equals(that.serialized_parts))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_serialized_parts = true && (is_set_serialized_parts());
+ list.add(present_serialized_parts);
+ if (present_serialized_parts)
+ list.add(serialized_parts);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LocalStateData other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_serialized_parts()).compareTo(other.is_set_serialized_parts());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_serialized_parts()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.serialized_parts, other.serialized_parts);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LocalStateData(");
+ boolean first = true;
+
+ sb.append("serialized_parts:");
+ if (this.serialized_parts == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.serialized_parts);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_serialized_parts()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'serialized_parts' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LocalStateDataStandardSchemeFactory implements SchemeFactory {
+ public LocalStateDataStandardScheme getScheme() {
+ return new LocalStateDataStandardScheme();
+ }
+ }
+
+ private static class LocalStateDataStandardScheme extends StandardScheme<LocalStateData> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LocalStateData struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // SERIALIZED_PARTS
+ if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map436 = iprot.readMapBegin();
+ struct.serialized_parts = new HashMap<String,ThriftSerializedObject>(2*_map436.size);
+ String _key437;
+ ThriftSerializedObject _val438;
+ for (int _i439 = 0; _i439 < _map436.size; ++_i439)
+ {
+ _key437 = iprot.readString();
+ _val438 = new ThriftSerializedObject();
+ _val438.read(iprot);
+ struct.serialized_parts.put(_key437, _val438);
+ }
+ iprot.readMapEnd();
+ }
+ struct.set_serialized_parts_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LocalStateData struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.serialized_parts != null) {
+ oprot.writeFieldBegin(SERIALIZED_PARTS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, struct.serialized_parts.size()));
+ for (Map.Entry<String, ThriftSerializedObject> _iter440 : struct.serialized_parts.entrySet())
+ {
+ oprot.writeString(_iter440.getKey());
+ _iter440.getValue().write(oprot);
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LocalStateDataTupleSchemeFactory implements SchemeFactory {
+ public LocalStateDataTupleScheme getScheme() {
+ return new LocalStateDataTupleScheme();
+ }
+ }
+
+ private static class LocalStateDataTupleScheme extends TupleScheme<LocalStateData> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LocalStateData struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ {
+ oprot.writeI32(struct.serialized_parts.size());
+ for (Map.Entry<String, ThriftSerializedObject> _iter441 : struct.serialized_parts.entrySet())
+ {
+ oprot.writeString(_iter441.getKey());
+ _iter441.getValue().write(oprot);
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LocalStateData struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ {
+ org.apache.thrift.protocol.TMap _map442 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.serialized_parts = new HashMap<String,ThriftSerializedObject>(2*_map442.size);
+ String _key443;
+ ThriftSerializedObject _val444;
+ for (int _i445 = 0; _i445 < _map442.size; ++_i445)
+ {
+ _key443 = iprot.readString();
+ _val444 = new ThriftSerializedObject();
+ _val444.read(iprot);
+ struct.serialized_parts.put(_key443, _val444);
+ }
+ }
+ struct.set_serialized_parts_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
new file mode 100644
index 0000000..6810669
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class ThriftSerializedObject implements org.apache.thrift.TBase<ThriftSerializedObject, ThriftSerializedObject._Fields>, java.io.Serializable, Cloneable, Comparable<ThriftSerializedObject> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftSerializedObject");
+
+ private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField BITS_FIELD_DESC = new org.apache.thrift.protocol.TField("bits", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ThriftSerializedObjectStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ThriftSerializedObjectTupleSchemeFactory());
+ }
+
+ private String name; // required
+ private ByteBuffer bits; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ NAME((short)1, "name"),
+ BITS((short)2, "bits");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // NAME
+ return NAME;
+ case 2: // BITS
+ return BITS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.BITS, new org.apache.thrift.meta_data.FieldMetaData("bits", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ThriftSerializedObject.class, metaDataMap);
+ }
+
+ public ThriftSerializedObject() {
+ }
+
+ public ThriftSerializedObject(
+ String name,
+ ByteBuffer bits)
+ {
+ this();
+ this.name = name;
+ this.bits = org.apache.thrift.TBaseHelper.copyBinary(bits);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ThriftSerializedObject(ThriftSerializedObject other) {
+ if (other.is_set_name()) {
+ this.name = other.name;
+ }
+ if (other.is_set_bits()) {
+ this.bits = org.apache.thrift.TBaseHelper.copyBinary(other.bits);
+ }
+ }
+
+ public ThriftSerializedObject deepCopy() {
+ return new ThriftSerializedObject(this);
+ }
+
+ @Override
+ public void clear() {
+ this.name = null;
+ this.bits = null;
+ }
+
+ public String get_name() {
+ return this.name;
+ }
+
+ public void set_name(String name) {
+ this.name = name;
+ }
+
+ public void unset_name() {
+ this.name = null;
+ }
+
+ /** Returns true if field name is set (has been assigned a value) and false otherwise */
+ public boolean is_set_name() {
+ return this.name != null;
+ }
+
+ public void set_name_isSet(boolean value) {
+ if (!value) {
+ this.name = null;
+ }
+ }
+
+ public byte[] get_bits() {
+ set_bits(org.apache.thrift.TBaseHelper.rightSize(bits));
+ return bits == null ? null : bits.array();
+ }
+
+ public ByteBuffer buffer_for_bits() {
+ return org.apache.thrift.TBaseHelper.copyBinary(bits);
+ }
+
+ public void set_bits(byte[] bits) {
+ this.bits = bits == null ? (ByteBuffer)null : ByteBuffer.wrap(Arrays.copyOf(bits, bits.length));
+ }
+
+ public void set_bits(ByteBuffer bits) {
+ this.bits = org.apache.thrift.TBaseHelper.copyBinary(bits);
+ }
+
+ public void unset_bits() {
+ this.bits = null;
+ }
+
+ /** Returns true if field bits is set (has been assigned a value) and false otherwise */
+ public boolean is_set_bits() {
+ return this.bits != null;
+ }
+
+ public void set_bits_isSet(boolean value) {
+ if (!value) {
+ this.bits = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case NAME:
+ if (value == null) {
+ unset_name();
+ } else {
+ set_name((String)value);
+ }
+ break;
+
+ case BITS:
+ if (value == null) {
+ unset_bits();
+ } else {
+ set_bits((ByteBuffer)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case NAME:
+ return get_name();
+
+ case BITS:
+ return get_bits();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case NAME:
+ return is_set_name();
+ case BITS:
+ return is_set_bits();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ThriftSerializedObject)
+ return this.equals((ThriftSerializedObject)that);
+ return false;
+ }
+
+ public boolean equals(ThriftSerializedObject that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_name = true && this.is_set_name();
+ boolean that_present_name = true && that.is_set_name();
+ if (this_present_name || that_present_name) {
+ if (!(this_present_name && that_present_name))
+ return false;
+ if (!this.name.equals(that.name))
+ return false;
+ }
+
+ boolean this_present_bits = true && this.is_set_bits();
+ boolean that_present_bits = true && that.is_set_bits();
+ if (this_present_bits || that_present_bits) {
+ if (!(this_present_bits && that_present_bits))
+ return false;
+ if (!this.bits.equals(that.bits))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_name = true && (is_set_name());
+ list.add(present_name);
+ if (present_name)
+ list.add(name);
+
+ boolean present_bits = true && (is_set_bits());
+ list.add(present_bits);
+ if (present_bits)
+ list.add(bits);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(ThriftSerializedObject other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_name()).compareTo(other.is_set_name());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_name()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_bits()).compareTo(other.is_set_bits());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_bits()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.bits, other.bits);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ThriftSerializedObject(");
+ boolean first = true;
+
+ sb.append("name:");
+ if (this.name == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.name);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("bits:");
+ if (this.bits == null) {
+ sb.append("null");
+ } else {
+ org.apache.thrift.TBaseHelper.toString(this.bits, sb);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_name()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'name' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_bits()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'bits' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ThriftSerializedObjectStandardSchemeFactory implements SchemeFactory {
+ public ThriftSerializedObjectStandardScheme getScheme() {
+ return new ThriftSerializedObjectStandardScheme();
+ }
+ }
+
+ private static class ThriftSerializedObjectStandardScheme extends StandardScheme<ThriftSerializedObject> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ThriftSerializedObject struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.name = iprot.readString();
+ struct.set_name_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // BITS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.bits = iprot.readBinary();
+ struct.set_bits_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ThriftSerializedObject struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.name != null) {
+ oprot.writeFieldBegin(NAME_FIELD_DESC);
+ oprot.writeString(struct.name);
+ oprot.writeFieldEnd();
+ }
+ if (struct.bits != null) {
+ oprot.writeFieldBegin(BITS_FIELD_DESC);
+ oprot.writeBinary(struct.bits);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ThriftSerializedObjectTupleSchemeFactory implements SchemeFactory {
+ public ThriftSerializedObjectTupleScheme getScheme() {
+ return new ThriftSerializedObjectTupleScheme();
+ }
+ }
+
+ private static class ThriftSerializedObjectTupleScheme extends TupleScheme<ThriftSerializedObject> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ThriftSerializedObject struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.name);
+ oprot.writeBinary(struct.bits);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ThriftSerializedObject struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.name = iprot.readString();
+ struct.set_name_isSet(true);
+ struct.bits = iprot.readBinary();
+ struct.set_bits_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 65f2152..b6ecba2 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -22,19 +22,27 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import backtype.storm.generated.LocalStateData;
+import backtype.storm.generated.ThriftSerializedObject;
+
/**
* A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
* Every read/write hits disk.
*/
public class LocalState {
public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
-
private VersionedStore _vs;
public LocalState(String backingDir) throws IOException {
@@ -42,73 +50,154 @@ public class LocalState {
_vs = new VersionedStore(backingDir);
}
- public synchronized Map<Object, Object> snapshot() throws IOException {
+ public synchronized Map<String, TBase> snapshot() {
int attempts = 0;
while(true) {
try {
return deserializeLatestVersion();
- } catch (IOException e) {
+ } catch (Exception e) {
attempts++;
if (attempts >= 10) {
- throw e;
+ throw new RuntimeException(e);
}
}
}
}
- private Map<Object, Object> deserializeLatestVersion() throws IOException {
- String latestPath = _vs.mostRecentVersionPath();
- Map<Object, Object> result = new HashMap<Object, Object>();
- if (latestPath != null) {
- byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath));
- if (serialized.length == 0) {
- LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
- } else {
- result = Utils.javaDeserialize(serialized, Map.class);
- }
+ private Map<String, TBase> deserializeLatestVersion() throws IOException {
+ Map<String, TBase> result = new HashMap<String, TBase>();
+ TDeserializer td = new TDeserializer();
+ for (Map.Entry<String, ThriftSerializedObject> ent: partialDeserializeLatestVersion(td).entrySet()) {
+ result.put(ent.getKey(), deserialize(ent.getValue(), td));
}
return result;
}
- public Object get(Object key) throws IOException {
- return snapshot().get(key);
+ private TBase deserialize(ThriftSerializedObject obj, TDeserializer td) {
+ try {
+ Class<?> clazz = Class.forName(obj.get_name());
+ TBase instance = (TBase) clazz.newInstance();
+ td.deserialize(instance, obj.get_bits());
+ return instance;
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map<String, ThriftSerializedObject> partialDeserializeLatestVersion(TDeserializer td) {
+ try {
+ String latestPath = _vs.mostRecentVersionPath();
+ Map<String, ThriftSerializedObject> result = new HashMap<String, ThriftSerializedObject>();
+ if (latestPath != null) {
+ byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath));
+ if (serialized.length == 0) {
+ LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
+ } else {
+ if (td == null) {
+ td = new TDeserializer();
+ }
+ LocalStateData data = new LocalStateData();
+ td.deserialize(data, serialized);
+ result = data.get_serialized_parts();
+ }
+ }
+ return result;
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized Map<String, ThriftSerializedObject> partialSnapshot(TDeserializer td) {
+ int attempts = 0;
+ while(true) {
+ try {
+ return partialDeserializeLatestVersion(td);
+ } catch (Exception e) {
+ attempts++;
+ if (attempts >= 10) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ public TBase get(String key) {
+ TDeserializer td = new TDeserializer();
+ Map<String, ThriftSerializedObject> partial = partialSnapshot(td);
+ ThriftSerializedObject tso = partial.get(key);
+ TBase ret = null;
+ if (tso != null) {
+ ret = deserialize(tso, td);
+ }
+ return ret;
}
- public synchronized void put(Object key, Object val) throws IOException {
+ public void put(String key, TBase val) {
put(key, val, true);
}
- public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
- Map<Object, Object> curr = snapshot();
- curr.put(key, val);
- persist(curr, cleanup);
+ public synchronized void put(String key, TBase val, boolean cleanup) {
+ Map<String, ThriftSerializedObject> curr = partialSnapshot(null);
+ TSerializer ser = new TSerializer();
+ curr.put(key, serialize(val, ser));
+ persistInternal(curr, ser, cleanup);
}
- public synchronized void remove(Object key) throws IOException {
+ public void remove(String key) {
remove(key, true);
}
- public synchronized void remove(Object key, boolean cleanup) throws IOException {
- Map<Object, Object> curr = snapshot();
+ public synchronized void remove(String key, boolean cleanup) {
+ Map<String, ThriftSerializedObject> curr = partialSnapshot(null);
curr.remove(key);
- persist(curr, cleanup);
+ persistInternal(curr, null, cleanup);
}
public synchronized void cleanup(int keepVersions) throws IOException {
_vs.cleanup(keepVersions);
}
-
- private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
- byte[] toWrite = Utils.javaSerialize(val);
- String newPath = _vs.createVersion();
- File file = new File(newPath);
- FileUtils.writeByteArrayToFile(file, toWrite);
- if (toWrite.length != file.length()) {
- throw new IOException("Tried to serialize " + toWrite.length +
- " bytes to " + file.getCanonicalPath() + ", but " +
- file.length() + " bytes were written.");
+
+ private void persistInternal(Map<String, ThriftSerializedObject> serialized, TSerializer ser, boolean cleanup) {
+ try {
+ if (ser == null) {
+ ser = new TSerializer();
+ }
+ byte[] toWrite = ser.serialize(new LocalStateData(serialized));
+
+ String newPath = _vs.createVersion();
+ File file = new File(newPath);
+ FileUtils.writeByteArrayToFile(file, toWrite);
+ if (toWrite.length != file.length()) {
+ throw new IOException("Tried to serialize " + toWrite.length +
+ " bytes to " + file.getCanonicalPath() + ", but " +
+ file.length() + " bytes were written.");
+ }
+ _vs.succeedVersion(newPath);
+ if(cleanup) _vs.cleanup(4);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ThriftSerializedObject serialize(TBase o, TSerializer ser) {
+ try {
+ return new ThriftSerializedObject(o.getClass().getName(), ByteBuffer.wrap(ser.serialize((TBase)o)));
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void persist(Map<String, TBase> val, boolean cleanup) {
+ try {
+ TSerializer ser = new TSerializer();
+ Map<String, ThriftSerializedObject> serialized = new HashMap<String, ThriftSerializedObject>();
+ for (Map.Entry<String, TBase> ent: val.entrySet()) {
+ Object o = ent.getValue();
+ serialized.put(ent.getKey(), serialize(ent.getValue(), ser));
+ }
+ persistInternal(serialized, ser, cleanup);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
}
- _vs.succeedVersion(newPath);
- if(cleanup) _vs.cleanup(4);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index f683dfe..9876f7b 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5358,6 +5360,600 @@ class ClusterWorkerHeartbeat:
def __ne__(self, other):
return not (self == other)
+class ThriftSerializedObject:
+ """
+ Attributes:
+ - name
+ - bits
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'name', None, None, ), # 1
+ (2, TType.STRING, 'bits', None, None, ), # 2
+ )
+
+ def __init__(self, name=None, bits=None,):
+ self.name = name
+ self.bits = bits
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.name = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.bits = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ThriftSerializedObject')
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 1)
+ oprot.writeString(self.name.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.bits is not None:
+ oprot.writeFieldBegin('bits', TType.STRING, 2)
+ oprot.writeString(self.bits)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.name is None:
+ raise TProtocol.TProtocolException(message='Required field name is unset!')
+ if self.bits is None:
+ raise TProtocol.TProtocolException(message='Required field bits is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.name)
+ value = (value * 31) ^ hash(self.bits)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LocalStateData:
+ """
+ Attributes:
+ - serialized_parts
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.MAP, 'serialized_parts', (TType.STRING,None,TType.STRUCT,(ThriftSerializedObject, ThriftSerializedObject.thrift_spec)), None, ), # 1
+ )
+
+ def __init__(self, serialized_parts=None,):
+ self.serialized_parts = serialized_parts
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.MAP:
+ self.serialized_parts = {}
+ (_ktype391, _vtype392, _size390 ) = iprot.readMapBegin()
+ for _i394 in xrange(_size390):
+ _key395 = iprot.readString().decode('utf-8')
+ _val396 = ThriftSerializedObject()
+ _val396.read(iprot)
+ self.serialized_parts[_key395] = _val396
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LocalStateData')
+ if self.serialized_parts is not None:
+ oprot.writeFieldBegin('serialized_parts', TType.MAP, 1)
+ oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.serialized_parts))
+ for kiter397,viter398 in self.serialized_parts.items():
+ oprot.writeString(kiter397.encode('utf-8'))
+ viter398.write(oprot)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.serialized_parts is None:
+ raise TProtocol.TProtocolException(message='Required field serialized_parts is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.serialized_parts)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LocalAssignment:
+ """
+ Attributes:
+ - topology_id
+ - executors
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'topology_id', None, None, ), # 1
+ (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2
+ )
+
+ def __init__(self, topology_id=None, executors=None,):
+ self.topology_id = topology_id
+ self.executors = executors
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.topology_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.LIST:
+ self.executors = []
+ (_etype402, _size399) = iprot.readListBegin()
+ for _i403 in xrange(_size399):
+ _elem404 = ExecutorInfo()
+ _elem404.read(iprot)
+ self.executors.append(_elem404)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LocalAssignment')
+ if self.topology_id is not None:
+ oprot.writeFieldBegin('topology_id', TType.STRING, 1)
+ oprot.writeString(self.topology_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.executors is not None:
+ oprot.writeFieldBegin('executors', TType.LIST, 2)
+ oprot.writeListBegin(TType.STRUCT, len(self.executors))
+ for iter405 in self.executors:
+ iter405.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.topology_id is None:
+ raise TProtocol.TProtocolException(message='Required field topology_id is unset!')
+ if self.executors is None:
+ raise TProtocol.TProtocolException(message='Required field executors is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.topology_id)
+ value = (value * 31) ^ hash(self.executors)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LSSupervisorId:
+ """
+ Attributes:
+ - supervisor_id
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'supervisor_id', None, None, ), # 1
+ )
+
+ def __init__(self, supervisor_id=None,):
+ self.supervisor_id = supervisor_id
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.supervisor_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LSSupervisorId')
+ if self.supervisor_id is not None:
+ oprot.writeFieldBegin('supervisor_id', TType.STRING, 1)
+ oprot.writeString(self.supervisor_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.supervisor_id is None:
+ raise TProtocol.TProtocolException(message='Required field supervisor_id is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.supervisor_id)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LSApprovedWorkers:
+ """
+ Attributes:
+ - approved_workers
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.MAP, 'approved_workers', (TType.STRING,None,TType.I32,None), None, ), # 1
+ )
+
+ def __init__(self, approved_workers=None,):
+ self.approved_workers = approved_workers
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.MAP:
+ self.approved_workers = {}
+ (_ktype407, _vtype408, _size406 ) = iprot.readMapBegin()
+ for _i410 in xrange(_size406):
+ _key411 = iprot.readString().decode('utf-8')
+ _val412 = iprot.readI32();
+ self.approved_workers[_key411] = _val412
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LSApprovedWorkers')
+ if self.approved_workers is not None:
+ oprot.writeFieldBegin('approved_workers', TType.MAP, 1)
+ oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers))
+ for kiter413,viter414 in self.approved_workers.items():
+ oprot.writeString(kiter413.encode('utf-8'))
+ oprot.writeI32(viter414)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.approved_workers is None:
+ raise TProtocol.TProtocolException(message='Required field approved_workers is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.approved_workers)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LSSupervisorAssignments:
+ """
+ Attributes:
+ - assignments
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.MAP, 'assignments', (TType.I32,None,TType.STRUCT,(LocalAssignment, LocalAssignment.thrift_spec)), None, ), # 1
+ )
+
+ def __init__(self, assignments=None,):
+ self.assignments = assignments
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.MAP:
+ self.assignments = {}
+ (_ktype416, _vtype417, _size415 ) = iprot.readMapBegin()
+ for _i419 in xrange(_size415):
+ _key420 = iprot.readI32();
+ _val421 = LocalAssignment()
+ _val421.read(iprot)
+ self.assignments[_key420] = _val421
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LSSupervisorAssignments')
+ if self.assignments is not None:
+ oprot.writeFieldBegin('assignments', TType.MAP, 1)
+ oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments))
+ for kiter422,viter423 in self.assignments.items():
+ oprot.writeI32(kiter422)
+ viter423.write(oprot)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.assignments is None:
+ raise TProtocol.TProtocolException(message='Required field assignments is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.assignments)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class LSWorkerHeartbeat:
+ """
+ Attributes:
+ - time_secs
+ - topology_id
+ - executors
+ - port
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'time_secs', None, None, ), # 1
+ (2, TType.STRING, 'topology_id', None, None, ), # 2
+ (3, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 3
+ (4, TType.I32, 'port', None, None, ), # 4
+ )
+
+ def __init__(self, time_secs=None, topology_id=None, executors=None, port=None,):
+ self.time_secs = time_secs
+ self.topology_id = topology_id
+ self.executors = executors
+ self.port = port
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.time_secs = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.topology_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.LIST:
+ self.executors = []
+ (_etype427, _size424) = iprot.readListBegin()
+ for _i428 in xrange(_size424):
+ _elem429 = ExecutorInfo()
+ _elem429.read(iprot)
+ self.executors.append(_elem429)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I32:
+ self.port = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('LSWorkerHeartbeat')
+ if self.time_secs is not None:
+ oprot.writeFieldBegin('time_secs', TType.I32, 1)
+ oprot.writeI32(self.time_secs)
+ oprot.writeFieldEnd()
+ if self.topology_id is not None:
+ oprot.writeFieldBegin('topology_id', TType.STRING, 2)
+ oprot.writeString(self.topology_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.executors is not None:
+ oprot.writeFieldBegin('executors', TType.LIST, 3)
+ oprot.writeListBegin(TType.STRUCT, len(self.executors))
+ for iter430 in self.executors:
+ iter430.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.I32, 4)
+ oprot.writeI32(self.port)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.time_secs is None:
+ raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+ if self.topology_id is None:
+ raise TProtocol.TProtocolException(message='Required field topology_id is unset!')
+ if self.executors is None:
+ raise TProtocol.TProtocolException(message='Required field executors is unset!')
+ if self.port is None:
+ raise TProtocol.TProtocolException(message='Required field port is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.time_secs)
+ value = (value * 31) ^ hash(self.topology_id)
+ value = (value * 31) ^ hash(self.executors)
+ value = (value * 31) ^ hash(self.port)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class GetInfoOptions:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index db4a7b3..f3b45e6 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -294,6 +294,39 @@ struct ClusterWorkerHeartbeat {
3: required i32 time_secs;
}
+struct ThriftSerializedObject {
+ 1: required string name;
+ 2: required binary bits;
+}
+
+struct LocalStateData {
+ 1: required map<string, ThriftSerializedObject> serialized_parts;
+}
+
+struct LocalAssignment {
+ 1: required string topology_id;
+ 2: required list<ExecutorInfo> executors;
+}
+
+struct LSSupervisorId {
+ 1: required string supervisor_id;
+}
+
+struct LSApprovedWorkers {
+ 1: required map<string, i32> approved_workers;
+}
+
+struct LSSupervisorAssignments {
+ 1: required map<i32, LocalAssignment> assignments;
+}
+
+struct LSWorkerHeartbeat {
+ 1: required i32 time_secs;
+ 2: required string topology_id;
+ 3: required list<ExecutorInfo> executors
+ 4: required i32 port;
+}
+
enum NumErrorsChoice {
ALL,
NONE,
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/test/clj/backtype/storm/local_state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj
index 4bd58ec..1364319 100644
--- a/storm-core/test/clj/backtype/storm/local_state_test.clj
+++ b/storm-core/test/clj/backtype/storm/local_state_test.clj
@@ -17,39 +17,39 @@
(:use [clojure test])
(:use [backtype.storm testing])
(:import [backtype.storm.utils LocalState]
+ [backtype.storm.generated GlobalStreamId]
[org.apache.commons.io FileUtils]
[java.io File]))
(deftest test-local-state
(with-local-tmp [dir1 dir2]
- (let [ls1 (LocalState. dir1)
+ (let [gs-a (GlobalStreamId. "a" "a")
+ gs-b (GlobalStreamId. "b" "b")
+ gs-c (GlobalStreamId. "c" "c")
+ gs-d (GlobalStreamId. "d" "d")
+ ls1 (LocalState. dir1)
ls2 (LocalState. dir2)]
(is (= {} (.snapshot ls1)))
- (.put ls1 "a" 1)
- (.put ls1 "b" 2)
- (is (= {"a" 1 "b" 2} (.snapshot ls1)))
+ (.put ls1 "a" gs-a)
+ (.put ls1 "b" gs-b)
+ (is (= {"a" gs-a "b" gs-b} (.snapshot ls1)))
(is (= {} (.snapshot ls2)))
- (is (= 1 (.get ls1 "a")))
+ (is (= gs-a (.get ls1 "a")))
(is (= nil (.get ls1 "c")))
- (is (= 2 (.get ls1 "b")))
- (is (= {"a" 1 "b" 2} (.snapshot (LocalState. dir1))))
- (.put ls2 "b" 1)
- (.put ls2 "b" 2)
- (.put ls2 "b" 3)
- (.put ls2 "b" 4)
- (.put ls2 "b" 5)
- (.put ls2 "b" 6)
- (.put ls2 "b" 7)
- (.put ls2 "b" 8)
- (is (= 8 (.get ls2 "b")))
- )))
+ (is (= gs-b (.get ls1 "b")))
+ (is (= {"a" gs-a "b" gs-b} (.snapshot (LocalState. dir1))))
+ (.put ls2 "b" gs-a)
+ (.put ls2 "b" gs-b)
+ (.put ls2 "b" gs-c)
+ (.put ls2 "b" gs-d)
+ (is (= gs-d (.get ls2 "b"))))))
(deftest empty-state
(with-local-tmp [dir]
(let [ls (LocalState. dir)
+ gs-a (GlobalStreamId. "a" "a")
data (FileUtils/openOutputStream (File. dir "12345"))
version (FileUtils/openOutputStream (File. dir "12345.version"))]
(is (= nil (.get ls "c")))
- (.put ls "a" 1)
- (is (= 1 (.get ls "a")))
- )))
+ (.put ls "a" gs-a)
+ (is (= gs-a (.get ls "a"))))))
[2/4] storm git commit: STORM-765: Thrift serialization for local
state.
Posted by bo...@apache.org.
STORM-765: Thrift serialization for local state.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad4375e8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad4375e8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad4375e8
Branch: refs/heads/master
Commit: ad4375e837004a0ec777a35f1cec1cdd89465e74
Parents: 6b52e0d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 13 13:51:09 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 13 13:51:09 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/common.clj | 13 -
.../clj/backtype/storm/daemon/supervisor.clj | 34 +-
.../src/clj/backtype/storm/daemon/worker.clj | 14 +-
.../src/clj/backtype/storm/local_state.clj | 99 +++
storm-core/src/clj/backtype/storm/testing.clj | 6 +-
.../storm/generated/LSApprovedWorkers.java | 458 +++++++++++
.../generated/LSSupervisorAssignments.java | 471 ++++++++++++
.../storm/generated/LSSupervisorId.java | 406 ++++++++++
.../storm/generated/LSWorkerHeartbeat.java | 755 +++++++++++++++++++
.../storm/generated/LocalAssignment.java | 561 ++++++++++++++
.../storm/generated/LocalStateData.java | 471 ++++++++++++
.../storm/generated/ThriftSerializedObject.java | 516 +++++++++++++
.../jvm/backtype/storm/utils/LocalState.java | 163 +++-
storm-core/src/py/storm/ttypes.py | 596 +++++++++++++++
storm-core/src/storm.thrift | 33 +
.../clj/backtype/storm/local_state_test.clj | 40 +-
16 files changed, 4532 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index c33609d..65482f3 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -58,19 +58,6 @@
(defprotocol DaemonCommon
(waiting? [this]))
-(def LS-WORKER-HEARTBEAT "worker-heartbeat")
-
-;; LocalState constants
-(def LS-ID "supervisor-id")
-(def LS-LOCAL-ASSIGNMENTS "local-assignments")
-(def LS-APPROVED-WORKERS "approved-workers")
-
-(defn mk-local-worker-heartbeat [time-secs storm-id executors port]
- {:time-secs time-secs
- :storm-id storm-id
- :executors executors
- :port port})
-
(defrecord ExecutorStats [^long processed
^long acked
^long emitted
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d9aaaa3..ca4b18f 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -23,7 +23,7 @@
[java.net URI]
[org.apache.commons.io FileUtils]
[java.io File])
- (:use [backtype.storm config util log timer])
+ (:use [backtype.storm config util log timer local-state])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]]
[backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
@@ -37,9 +37,6 @@
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
-;; used as part of a map from port to this
-(defrecord LocalAssignment [storm-id executors])
-
(defprotocol SupervisorDaemon
(get-id [this])
(get-conf [this])
@@ -75,7 +72,7 @@
(into {} (for [[port executors] port-executors]
;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs
- [(Integer. port) (LocalAssignment. storm-id (doall executors))]
+ [(Integer. port) (mk-local-assignment storm-id (doall executors))]
))))
(defn- read-assignments
@@ -103,8 +100,8 @@
(defn read-worker-heartbeat [conf id]
(let [local-state (worker-state conf id)]
(try
- (.get local-state LS-WORKER-HEARTBEAT)
- (catch IOException e
+ (ls-worker-heartbeat local-state)
+ (catch Exception e
(log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.")
nil))))
@@ -147,7 +144,7 @@
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
- approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
+ approved-ids (set (keys (ls-approved-workers local-state)))]
(into
{}
(dofor [[id hb] id->heartbeat]
@@ -173,7 +170,7 @@
(defn- wait-for-worker-launch [conf id start-time]
(let [state (worker-state conf id)]
(loop []
- (let [hb (.get state LS-WORKER-HEARTBEAT)]
+ (let [hb (ls-worker-heartbeat state)]
(when (and
(not hb)
(<
@@ -184,7 +181,7 @@
(Time/sleep 500)
(recur)
)))
- (when-not (.get state LS-WORKER-HEARTBEAT)
+ (when-not (ls-worker-heartbeat state)
(log-message "Worker " id " failed to start")
)))
@@ -319,7 +316,7 @@
download-lock (:download-lock supervisor)
^LocalState local-state (:local-state supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
- assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
+ assigned-executors (defaulted (ls-local-assignments local-state) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
@@ -357,9 +354,9 @@
(doseq [id (vals new-worker-ids)]
(local-mkdirs (worker-pids-root conf id))
(local-mkdirs (worker-heartbeats-root conf id)))
- (.put local-state LS-APPROVED-WORKERS
+ (ls-approved-workers! local-state
(merge
- (select-keys (.get local-state LS-APPROVED-WORKERS)
+ (select-keys (ls-approved-workers local-state)
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
@@ -415,7 +412,7 @@
(defn shutdown-disallowed-workers [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
- assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
+ assigned-executors (defaulted (ls-local-assignments local-state) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
disallowed (keys (filter-val
@@ -441,7 +438,7 @@
assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
- existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
+ existing-assignment (ls-local-assignments local-state)
all-assignment (read-assignments assignments-snapshot
(:assignment-id supervisor)
existing-assignment
@@ -471,8 +468,7 @@
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.assigned isupervisor (keys new-assignment))
- (.put local-state
- LS-LOCAL-ASSIGNMENTS
+ (ls-local-assignments! local-state
new-assignment)
(reset! (:assignment-versions supervisor) versions)
(reset! (:curr-assignment supervisor) new-assignment)
@@ -779,10 +775,10 @@
(prepare [this conf local-dir]
(reset! conf-atom conf)
(let [state (LocalState. local-dir)
- curr-id (if-let [id (.get state LS-ID)]
+ curr-id (if-let [id (ls-supervisor-id state)]
id
(generate-supervisor-id))]
- (.put state LS-ID curr-id)
+ (ls-supervisor-id! state curr-id)
(reset! id-atom curr-id))
)
(confirmAssigned [this port]
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e0263d6..978ea16 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns backtype.storm.daemon.worker
(:use [backtype.storm.daemon common])
- (:use [backtype.storm config log util timer])
+ (:use [backtype.storm config log util timer local-state])
(:require [backtype.storm.daemon [executor :as executor]])
(:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]])
(:require [clojure.set :as set])
@@ -68,19 +68,9 @@
(defn do-heartbeat [worker]
(let [conf (:conf worker)
- hb (mk-local-worker-heartbeat
- (current-time-secs)
- (:storm-id worker)
- (:executors worker)
- (:port worker))
state (worker-state conf (:worker-id worker))]
- (log-debug "Doing heartbeat " (pr-str hb))
;; do the local-file-system heartbeat.
- (.put state
- LS-WORKER-HEARTBEAT
- hb
- false
- )
+ (ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/clj/backtype/storm/local_state.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/local_state.clj b/storm-core/src/clj/backtype/storm/local_state.clj
new file mode 100644
index 0000000..41e3675
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/local_state.clj
@@ -0,0 +1,99 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.local-state
+ (:use [backtype.storm log util])
+ (:import [backtype.storm.generated StormTopology
+ InvalidTopologyException GlobalStreamId
+ LSSupervisorId LSApprovedWorkers
+ LSSupervisorAssignments LocalAssignment
+ ExecutorInfo LSWorkerHeartbeat])
+ (:import [backtype.storm.utils LocalState]))
+
+(def LS-WORKER-HEARTBEAT "worker-heartbeat")
+(def LS-ID "supervisor-id")
+(def LS-LOCAL-ASSIGNMENTS "local-assignments")
+(def LS-APPROVED-WORKERS "approved-workers")
+
+(defn ls-supervisor-id!
+ [^LocalState local-state ^String id]
+ (.put local-state LS-ID (LSSupervisorId. id)))
+
+(defn ls-supervisor-id
+ [^LocalState local-state]
+ (if-let [super-id (.get local-state LS-ID)]
+ (.get_supervisor_id super-id)))
+
+(defn ls-approved-workers!
+ [^LocalState local-state workers]
+ (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
+
+(defn ls-approved-workers
+ [^LocalState local-state]
+ (if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
+ (into {} (.get_approved_workers tmp))))
+
+(defn ->ExecutorInfo
+ [[low high]] (ExecutorInfo. low high))
+
+(defn ->ExecutorInfo-list
+ [executors]
+ (map ->ExecutorInfo executors))
+
+(defn ->executor-list
+ [executors]
+ (into []
+ (for [exec-info executors]
+ [(.get_task_start exec-info) (.get_task_end exec-info)])))
+
+(defn ->LocalAssignment
+ [{storm-id :storm-id executors :executors}]
+ (LocalAssignment. storm-id (->ExecutorInfo-list executors)))
+
+(defn mk-local-assignment
+ [storm-id executors]
+ {:storm-id storm-id :executors executors})
+
+(defn ->local-assignment
+ [^LocalAssignment thrift-local-assignment]
+ (mk-local-assignment
+ (.get_topology_id thrift-local-assignment)
+ (->executor-list (.get_executors thrift-local-assignment))))
+
+(defn ls-local-assignments!
+ [^LocalState local-state assignments]
+ (let [local-assignment-map (map-val ->LocalAssignment assignments)]
+ (.put local-state LS-LOCAL-ASSIGNMENTS
+ (LSSupervisorAssignments. local-assignment-map))))
+
+(defn ls-local-assignments
+ [^LocalState local-state]
+ (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
+ (map-val
+ ->local-assignment
+ (.get_assignments thrift-local-assignments))))
+
+(defn ls-worker-heartbeat!
+ [^LocalState local-state time-secs storm-id executors port]
+ (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false))
+
+(defn ls-worker-heartbeat
+ [^LocalState local-state]
+ (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
+ {:time-secs (.get_time_secs worker-hb)
+ :storm-id (.get_topology_id worker-hb)
+ :executors (->executor-list (.get_executors worker-hb))
+ :port (.get_port worker-hb)}))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 02bf13d..eccb46a 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -42,7 +42,7 @@
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.messaging.loader :as msg-loader])
(:require [backtype.storm.daemon.acker :as acker])
- (:use [backtype.storm cluster util thrift config log]))
+ (:use [backtype.storm cluster util thrift config log local-state]))
(defn feeder-spout
[fields]
@@ -302,13 +302,13 @@
(defn find-worker-id
[supervisor-conf port]
(let [supervisor-state (supervisor-state supervisor-conf)
- worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
+ worker->port (ls-approved-workers supervisor-state)]
(first ((reverse-map worker->port) port))))
(defn find-worker-port
[supervisor-conf worker-id]
(let [supervisor-state (supervisor-state supervisor-conf)
- worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
+ worker->port (ls-approved-workers supervisor-state)]
(worker->port worker-id)))
(defn mk-capture-shutdown-fn
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LSApprovedWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LSApprovedWorkers.java b/storm-core/src/jvm/backtype/storm/generated/LSApprovedWorkers.java
new file mode 100644
index 0000000..1d97dca
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LSApprovedWorkers.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LSApprovedWorkers implements org.apache.thrift.TBase<LSApprovedWorkers, LSApprovedWorkers._Fields>, java.io.Serializable, Cloneable, Comparable<LSApprovedWorkers> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LSApprovedWorkers");
+
+ private static final org.apache.thrift.protocol.TField APPROVED_WORKERS_FIELD_DESC = new org.apache.thrift.protocol.TField("approved_workers", org.apache.thrift.protocol.TType.MAP, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LSApprovedWorkersStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LSApprovedWorkersTupleSchemeFactory());
+ }
+
+ private Map<String,Integer> approved_workers; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ APPROVED_WORKERS((short)1, "approved_workers");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // APPROVED_WORKERS
+ return APPROVED_WORKERS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.APPROVED_WORKERS, new org.apache.thrift.meta_data.FieldMetaData("approved_workers", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LSApprovedWorkers.class, metaDataMap);
+ }
+
+ public LSApprovedWorkers() {
+ }
+
+ public LSApprovedWorkers(
+ Map<String,Integer> approved_workers)
+ {
+ this();
+ this.approved_workers = approved_workers;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LSApprovedWorkers(LSApprovedWorkers other) {
+ if (other.is_set_approved_workers()) {
+ Map<String,Integer> __this__approved_workers = new HashMap<String,Integer>(other.approved_workers);
+ this.approved_workers = __this__approved_workers;
+ }
+ }
+
+ public LSApprovedWorkers deepCopy() {
+ return new LSApprovedWorkers(this);
+ }
+
+ @Override
+ public void clear() {
+ this.approved_workers = null;
+ }
+
+ public int get_approved_workers_size() {
+ return (this.approved_workers == null) ? 0 : this.approved_workers.size();
+ }
+
+ public void put_to_approved_workers(String key, int val) {
+ if (this.approved_workers == null) {
+ this.approved_workers = new HashMap<String,Integer>();
+ }
+ this.approved_workers.put(key, val);
+ }
+
+ public Map<String,Integer> get_approved_workers() {
+ return this.approved_workers;
+ }
+
+ public void set_approved_workers(Map<String,Integer> approved_workers) {
+ this.approved_workers = approved_workers;
+ }
+
+ public void unset_approved_workers() {
+ this.approved_workers = null;
+ }
+
+ /** Returns true if field approved_workers is set (has been assigned a value) and false otherwise */
+ public boolean is_set_approved_workers() {
+ return this.approved_workers != null;
+ }
+
+ public void set_approved_workers_isSet(boolean value) {
+ if (!value) {
+ this.approved_workers = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case APPROVED_WORKERS:
+ if (value == null) {
+ unset_approved_workers();
+ } else {
+ set_approved_workers((Map<String,Integer>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case APPROVED_WORKERS:
+ return get_approved_workers();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case APPROVED_WORKERS:
+ return is_set_approved_workers();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LSApprovedWorkers)
+ return this.equals((LSApprovedWorkers)that);
+ return false;
+ }
+
+ public boolean equals(LSApprovedWorkers that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_approved_workers = true && this.is_set_approved_workers();
+ boolean that_present_approved_workers = true && that.is_set_approved_workers();
+ if (this_present_approved_workers || that_present_approved_workers) {
+ if (!(this_present_approved_workers && that_present_approved_workers))
+ return false;
+ if (!this.approved_workers.equals(that.approved_workers))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_approved_workers = true && (is_set_approved_workers());
+ list.add(present_approved_workers);
+ if (present_approved_workers)
+ list.add(approved_workers);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LSApprovedWorkers other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_approved_workers()).compareTo(other.is_set_approved_workers());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_approved_workers()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.approved_workers, other.approved_workers);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LSApprovedWorkers(");
+ boolean first = true;
+
+ sb.append("approved_workers:");
+ if (this.approved_workers == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.approved_workers);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_approved_workers()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'approved_workers' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LSApprovedWorkersStandardSchemeFactory implements SchemeFactory {
+ public LSApprovedWorkersStandardScheme getScheme() {
+ return new LSApprovedWorkersStandardScheme();
+ }
+ }
+
+ private static class LSApprovedWorkersStandardScheme extends StandardScheme<LSApprovedWorkers> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LSApprovedWorkers struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // APPROVED_WORKERS
+ if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map454 = iprot.readMapBegin();
+ struct.approved_workers = new HashMap<String,Integer>(2*_map454.size);
+ String _key455;
+ int _val456;
+ for (int _i457 = 0; _i457 < _map454.size; ++_i457)
+ {
+ _key455 = iprot.readString();
+ _val456 = iprot.readI32();
+ struct.approved_workers.put(_key455, _val456);
+ }
+ iprot.readMapEnd();
+ }
+ struct.set_approved_workers_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LSApprovedWorkers struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.approved_workers != null) {
+ oprot.writeFieldBegin(APPROVED_WORKERS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I32, struct.approved_workers.size()));
+ for (Map.Entry<String, Integer> _iter458 : struct.approved_workers.entrySet())
+ {
+ oprot.writeString(_iter458.getKey());
+ oprot.writeI32(_iter458.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LSApprovedWorkersTupleSchemeFactory implements SchemeFactory {
+ public LSApprovedWorkersTupleScheme getScheme() {
+ return new LSApprovedWorkersTupleScheme();
+ }
+ }
+
+ private static class LSApprovedWorkersTupleScheme extends TupleScheme<LSApprovedWorkers> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LSApprovedWorkers struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ {
+ oprot.writeI32(struct.approved_workers.size());
+ for (Map.Entry<String, Integer> _iter459 : struct.approved_workers.entrySet())
+ {
+ oprot.writeString(_iter459.getKey());
+ oprot.writeI32(_iter459.getValue());
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LSApprovedWorkers struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ {
+ org.apache.thrift.protocol.TMap _map460 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I32, iprot.readI32());
+ struct.approved_workers = new HashMap<String,Integer>(2*_map460.size);
+ String _key461;
+ int _val462;
+ for (int _i463 = 0; _i463 < _map460.size; ++_i463)
+ {
+ _key461 = iprot.readString();
+ _val462 = iprot.readI32();
+ struct.approved_workers.put(_key461, _val462);
+ }
+ }
+ struct.set_approved_workers_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LSSupervisorAssignments.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LSSupervisorAssignments.java b/storm-core/src/jvm/backtype/storm/generated/LSSupervisorAssignments.java
new file mode 100644
index 0000000..4667287
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LSSupervisorAssignments.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LSSupervisorAssignments implements org.apache.thrift.TBase<LSSupervisorAssignments, LSSupervisorAssignments._Fields>, java.io.Serializable, Cloneable, Comparable<LSSupervisorAssignments> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LSSupervisorAssignments");
+
+ private static final org.apache.thrift.protocol.TField ASSIGNMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("assignments", org.apache.thrift.protocol.TType.MAP, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LSSupervisorAssignmentsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LSSupervisorAssignmentsTupleSchemeFactory());
+ }
+
+ private Map<Integer,LocalAssignment> assignments; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ ASSIGNMENTS((short)1, "assignments");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // ASSIGNMENTS
+ return ASSIGNMENTS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.ASSIGNMENTS, new org.apache.thrift.meta_data.FieldMetaData("assignments", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32),
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, LocalAssignment.class))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LSSupervisorAssignments.class, metaDataMap);
+ }
+
+ public LSSupervisorAssignments() {
+ }
+
+ public LSSupervisorAssignments(
+ Map<Integer,LocalAssignment> assignments)
+ {
+ this();
+ this.assignments = assignments;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LSSupervisorAssignments(LSSupervisorAssignments other) {
+ if (other.is_set_assignments()) {
+ Map<Integer,LocalAssignment> __this__assignments = new HashMap<Integer,LocalAssignment>(other.assignments.size());
+ for (Map.Entry<Integer, LocalAssignment> other_element : other.assignments.entrySet()) {
+
+ Integer other_element_key = other_element.getKey();
+ LocalAssignment other_element_value = other_element.getValue();
+
+ Integer __this__assignments_copy_key = other_element_key;
+
+ LocalAssignment __this__assignments_copy_value = new LocalAssignment(other_element_value);
+
+ __this__assignments.put(__this__assignments_copy_key, __this__assignments_copy_value);
+ }
+ this.assignments = __this__assignments;
+ }
+ }
+
+ public LSSupervisorAssignments deepCopy() {
+ return new LSSupervisorAssignments(this);
+ }
+
+ @Override
+ public void clear() {
+ this.assignments = null;
+ }
+
+ public int get_assignments_size() {
+ return (this.assignments == null) ? 0 : this.assignments.size();
+ }
+
+ public void put_to_assignments(int key, LocalAssignment val) {
+ if (this.assignments == null) {
+ this.assignments = new HashMap<Integer,LocalAssignment>();
+ }
+ this.assignments.put(key, val);
+ }
+
+ public Map<Integer,LocalAssignment> get_assignments() {
+ return this.assignments;
+ }
+
+ public void set_assignments(Map<Integer,LocalAssignment> assignments) {
+ this.assignments = assignments;
+ }
+
+ public void unset_assignments() {
+ this.assignments = null;
+ }
+
+ /** Returns true if field assignments is set (has been assigned a value) and false otherwise */
+ public boolean is_set_assignments() {
+ return this.assignments != null;
+ }
+
+ public void set_assignments_isSet(boolean value) {
+ if (!value) {
+ this.assignments = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case ASSIGNMENTS:
+ if (value == null) {
+ unset_assignments();
+ } else {
+ set_assignments((Map<Integer,LocalAssignment>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case ASSIGNMENTS:
+ return get_assignments();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case ASSIGNMENTS:
+ return is_set_assignments();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LSSupervisorAssignments)
+ return this.equals((LSSupervisorAssignments)that);
+ return false;
+ }
+
+ public boolean equals(LSSupervisorAssignments that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_assignments = true && this.is_set_assignments();
+ boolean that_present_assignments = true && that.is_set_assignments();
+ if (this_present_assignments || that_present_assignments) {
+ if (!(this_present_assignments && that_present_assignments))
+ return false;
+ if (!this.assignments.equals(that.assignments))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_assignments = true && (is_set_assignments());
+ list.add(present_assignments);
+ if (present_assignments)
+ list.add(assignments);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LSSupervisorAssignments other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_assignments()).compareTo(other.is_set_assignments());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_assignments()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.assignments, other.assignments);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LSSupervisorAssignments(");
+ boolean first = true;
+
+ sb.append("assignments:");
+ if (this.assignments == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.assignments);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_assignments()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'assignments' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LSSupervisorAssignmentsStandardSchemeFactory implements SchemeFactory {
+ public LSSupervisorAssignmentsStandardScheme getScheme() {
+ return new LSSupervisorAssignmentsStandardScheme();
+ }
+ }
+
+ private static class LSSupervisorAssignmentsStandardScheme extends StandardScheme<LSSupervisorAssignments> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LSSupervisorAssignments struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // ASSIGNMENTS
+ if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map464 = iprot.readMapBegin();
+ struct.assignments = new HashMap<Integer,LocalAssignment>(2*_map464.size);
+ int _key465;
+ LocalAssignment _val466;
+ for (int _i467 = 0; _i467 < _map464.size; ++_i467)
+ {
+ _key465 = iprot.readI32();
+ _val466 = new LocalAssignment();
+ _val466.read(iprot);
+ struct.assignments.put(_key465, _val466);
+ }
+ iprot.readMapEnd();
+ }
+ struct.set_assignments_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LSSupervisorAssignments struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.assignments != null) {
+ oprot.writeFieldBegin(ASSIGNMENTS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.I32, org.apache.thrift.protocol.TType.STRUCT, struct.assignments.size()));
+ for (Map.Entry<Integer, LocalAssignment> _iter468 : struct.assignments.entrySet())
+ {
+ oprot.writeI32(_iter468.getKey());
+ _iter468.getValue().write(oprot);
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LSSupervisorAssignmentsTupleSchemeFactory implements SchemeFactory {
+ public LSSupervisorAssignmentsTupleScheme getScheme() {
+ return new LSSupervisorAssignmentsTupleScheme();
+ }
+ }
+
+ private static class LSSupervisorAssignmentsTupleScheme extends TupleScheme<LSSupervisorAssignments> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LSSupervisorAssignments struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ {
+ oprot.writeI32(struct.assignments.size());
+ for (Map.Entry<Integer, LocalAssignment> _iter469 : struct.assignments.entrySet())
+ {
+ oprot.writeI32(_iter469.getKey());
+ _iter469.getValue().write(oprot);
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LSSupervisorAssignments struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ {
+ org.apache.thrift.protocol.TMap _map470 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.I32, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.assignments = new HashMap<Integer,LocalAssignment>(2*_map470.size);
+ int _key471;
+ LocalAssignment _val472;
+ for (int _i473 = 0; _i473 < _map470.size; ++_i473)
+ {
+ _key471 = iprot.readI32();
+ _val472 = new LocalAssignment();
+ _val472.read(iprot);
+ struct.assignments.put(_key471, _val472);
+ }
+ }
+ struct.set_assignments_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LSSupervisorId.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LSSupervisorId.java b/storm-core/src/jvm/backtype/storm/generated/LSSupervisorId.java
new file mode 100644
index 0000000..6ee4dad
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LSSupervisorId.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LSSupervisorId implements org.apache.thrift.TBase<LSSupervisorId, LSSupervisorId._Fields>, java.io.Serializable, Cloneable, Comparable<LSSupervisorId> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LSSupervisorId");
+
+ private static final org.apache.thrift.protocol.TField SUPERVISOR_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("supervisor_id", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LSSupervisorIdStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LSSupervisorIdTupleSchemeFactory());
+ }
+
+ private String supervisor_id; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ SUPERVISOR_ID((short)1, "supervisor_id");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // SUPERVISOR_ID
+ return SUPERVISOR_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUPERVISOR_ID, new org.apache.thrift.meta_data.FieldMetaData("supervisor_id", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LSSupervisorId.class, metaDataMap);
+ }
+
+ public LSSupervisorId() {
+ }
+
+ public LSSupervisorId(
+ String supervisor_id)
+ {
+ this();
+ this.supervisor_id = supervisor_id;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LSSupervisorId(LSSupervisorId other) {
+ if (other.is_set_supervisor_id()) {
+ this.supervisor_id = other.supervisor_id;
+ }
+ }
+
+ public LSSupervisorId deepCopy() {
+ return new LSSupervisorId(this);
+ }
+
+ @Override
+ public void clear() {
+ this.supervisor_id = null;
+ }
+
+ public String get_supervisor_id() {
+ return this.supervisor_id;
+ }
+
+ public void set_supervisor_id(String supervisor_id) {
+ this.supervisor_id = supervisor_id;
+ }
+
+ public void unset_supervisor_id() {
+ this.supervisor_id = null;
+ }
+
+ /** Returns true if field supervisor_id is set (has been assigned a value) and false otherwise */
+ public boolean is_set_supervisor_id() {
+ return this.supervisor_id != null;
+ }
+
+ public void set_supervisor_id_isSet(boolean value) {
+ if (!value) {
+ this.supervisor_id = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUPERVISOR_ID:
+ if (value == null) {
+ unset_supervisor_id();
+ } else {
+ set_supervisor_id((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUPERVISOR_ID:
+ return get_supervisor_id();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case SUPERVISOR_ID:
+ return is_set_supervisor_id();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LSSupervisorId)
+ return this.equals((LSSupervisorId)that);
+ return false;
+ }
+
+ public boolean equals(LSSupervisorId that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_supervisor_id = true && this.is_set_supervisor_id();
+ boolean that_present_supervisor_id = true && that.is_set_supervisor_id();
+ if (this_present_supervisor_id || that_present_supervisor_id) {
+ if (!(this_present_supervisor_id && that_present_supervisor_id))
+ return false;
+ if (!this.supervisor_id.equals(that.supervisor_id))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_supervisor_id = true && (is_set_supervisor_id());
+ list.add(present_supervisor_id);
+ if (present_supervisor_id)
+ list.add(supervisor_id);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LSSupervisorId other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_supervisor_id()).compareTo(other.is_set_supervisor_id());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_supervisor_id()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.supervisor_id, other.supervisor_id);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LSSupervisorId(");
+ boolean first = true;
+
+ sb.append("supervisor_id:");
+ if (this.supervisor_id == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.supervisor_id);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_supervisor_id()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'supervisor_id' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LSSupervisorIdStandardSchemeFactory implements SchemeFactory {
+ public LSSupervisorIdStandardScheme getScheme() {
+ return new LSSupervisorIdStandardScheme();
+ }
+ }
+
+ private static class LSSupervisorIdStandardScheme extends StandardScheme<LSSupervisorId> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LSSupervisorId struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // SUPERVISOR_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.supervisor_id = iprot.readString();
+ struct.set_supervisor_id_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LSSupervisorId struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.supervisor_id != null) {
+ oprot.writeFieldBegin(SUPERVISOR_ID_FIELD_DESC);
+ oprot.writeString(struct.supervisor_id);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LSSupervisorIdTupleSchemeFactory implements SchemeFactory {
+ public LSSupervisorIdTupleScheme getScheme() {
+ return new LSSupervisorIdTupleScheme();
+ }
+ }
+
+ private static class LSSupervisorIdTupleScheme extends TupleScheme<LSSupervisorId> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LSSupervisorId struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.supervisor_id);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LSSupervisorId struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.supervisor_id = iprot.readString();
+ struct.set_supervisor_id_isSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/ad4375e8/storm-core/src/jvm/backtype/storm/generated/LSWorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/LSWorkerHeartbeat.java b/storm-core/src/jvm/backtype/storm/generated/LSWorkerHeartbeat.java
new file mode 100644
index 0000000..80f917b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/LSWorkerHeartbeat.java
@@ -0,0 +1,755 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-4-10")
+public class LSWorkerHeartbeat implements org.apache.thrift.TBase<LSWorkerHeartbeat, LSWorkerHeartbeat._Fields>, java.io.Serializable, Cloneable, Comparable<LSWorkerHeartbeat> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LSWorkerHeartbeat");
+
+ private static final org.apache.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("time_secs", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift.protocol.TField("executors", org.apache.thrift.protocol.TType.LIST, (short)3);
+ private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)4);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new LSWorkerHeartbeatStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new LSWorkerHeartbeatTupleSchemeFactory());
+ }
+
+ private int time_secs; // required
+ private String topology_id; // required
+ private List<ExecutorInfo> executors; // required
+ private int port; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TIME_SECS((short)1, "time_secs"),
+ TOPOLOGY_ID((short)2, "topology_id"),
+ EXECUTORS((short)3, "executors"),
+ PORT((short)4, "port");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TIME_SECS
+ return TIME_SECS;
+ case 2: // TOPOLOGY_ID
+ return TOPOLOGY_ID;
+ case 3: // EXECUTORS
+ return EXECUTORS;
+ case 4: // PORT
+ return PORT;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __TIME_SECS_ISSET_ID = 0;
+ private static final int __PORT_ISSET_ID = 1;
+ private byte __isset_bitfield = 0;
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("time_secs", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.TOPOLOGY_ID, new org.apache.thrift.meta_data.FieldMetaData("topology_id", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.EXECUTORS, new org.apache.thrift.meta_data.FieldMetaData("executors", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorInfo.class))));
+ tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LSWorkerHeartbeat.class, metaDataMap);
+ }
+
+ public LSWorkerHeartbeat() {
+ }
+
+ public LSWorkerHeartbeat(
+ int time_secs,
+ String topology_id,
+ List<ExecutorInfo> executors,
+ int port)
+ {
+ this();
+ this.time_secs = time_secs;
+ set_time_secs_isSet(true);
+ this.topology_id = topology_id;
+ this.executors = executors;
+ this.port = port;
+ set_port_isSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LSWorkerHeartbeat(LSWorkerHeartbeat other) {
+ __isset_bitfield = other.__isset_bitfield;
+ this.time_secs = other.time_secs;
+ if (other.is_set_topology_id()) {
+ this.topology_id = other.topology_id;
+ }
+ if (other.is_set_executors()) {
+ List<ExecutorInfo> __this__executors = new ArrayList<ExecutorInfo>(other.executors.size());
+ for (ExecutorInfo other_element : other.executors) {
+ __this__executors.add(new ExecutorInfo(other_element));
+ }
+ this.executors = __this__executors;
+ }
+ this.port = other.port;
+ }
+
+ public LSWorkerHeartbeat deepCopy() {
+ return new LSWorkerHeartbeat(this);
+ }
+
+ @Override
+ public void clear() {
+ set_time_secs_isSet(false);
+ this.time_secs = 0;
+ this.topology_id = null;
+ this.executors = null;
+ set_port_isSet(false);
+ this.port = 0;
+ }
+
+ public int get_time_secs() {
+ return this.time_secs;
+ }
+
+ public void set_time_secs(int time_secs) {
+ this.time_secs = time_secs;
+ set_time_secs_isSet(true);
+ }
+
+ public void unset_time_secs() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIME_SECS_ISSET_ID);
+ }
+
+ /** Returns true if field time_secs is set (has been assigned a value) and false otherwise */
+ public boolean is_set_time_secs() {
+ return EncodingUtils.testBit(__isset_bitfield, __TIME_SECS_ISSET_ID);
+ }
+
+ public void set_time_secs_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIME_SECS_ISSET_ID, value);
+ }
+
+ public String get_topology_id() {
+ return this.topology_id;
+ }
+
+ public void set_topology_id(String topology_id) {
+ this.topology_id = topology_id;
+ }
+
+ public void unset_topology_id() {
+ this.topology_id = null;
+ }
+
+ /** Returns true if field topology_id is set (has been assigned a value) and false otherwise */
+ public boolean is_set_topology_id() {
+ return this.topology_id != null;
+ }
+
+ public void set_topology_id_isSet(boolean value) {
+ if (!value) {
+ this.topology_id = null;
+ }
+ }
+
+ public int get_executors_size() {
+ return (this.executors == null) ? 0 : this.executors.size();
+ }
+
+ public java.util.Iterator<ExecutorInfo> get_executors_iterator() {
+ return (this.executors == null) ? null : this.executors.iterator();
+ }
+
+ public void add_to_executors(ExecutorInfo elem) {
+ if (this.executors == null) {
+ this.executors = new ArrayList<ExecutorInfo>();
+ }
+ this.executors.add(elem);
+ }
+
+ public List<ExecutorInfo> get_executors() {
+ return this.executors;
+ }
+
+ public void set_executors(List<ExecutorInfo> executors) {
+ this.executors = executors;
+ }
+
+ public void unset_executors() {
+ this.executors = null;
+ }
+
+ /** Returns true if field executors is set (has been assigned a value) and false otherwise */
+ public boolean is_set_executors() {
+ return this.executors != null;
+ }
+
+ public void set_executors_isSet(boolean value) {
+ if (!value) {
+ this.executors = null;
+ }
+ }
+
+ public int get_port() {
+ return this.port;
+ }
+
+ public void set_port(int port) {
+ this.port = port;
+ set_port_isSet(true);
+ }
+
+ public void unset_port() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
+ }
+
+ /** Returns true if field port is set (has been assigned a value) and false otherwise */
+ public boolean is_set_port() {
+ return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
+ }
+
+ public void set_port_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TIME_SECS:
+ if (value == null) {
+ unset_time_secs();
+ } else {
+ set_time_secs((Integer)value);
+ }
+ break;
+
+ case TOPOLOGY_ID:
+ if (value == null) {
+ unset_topology_id();
+ } else {
+ set_topology_id((String)value);
+ }
+ break;
+
+ case EXECUTORS:
+ if (value == null) {
+ unset_executors();
+ } else {
+ set_executors((List<ExecutorInfo>)value);
+ }
+ break;
+
+ case PORT:
+ if (value == null) {
+ unset_port();
+ } else {
+ set_port((Integer)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TIME_SECS:
+ return Integer.valueOf(get_time_secs());
+
+ case TOPOLOGY_ID:
+ return get_topology_id();
+
+ case EXECUTORS:
+ return get_executors();
+
+ case PORT:
+ return Integer.valueOf(get_port());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TIME_SECS:
+ return is_set_time_secs();
+ case TOPOLOGY_ID:
+ return is_set_topology_id();
+ case EXECUTORS:
+ return is_set_executors();
+ case PORT:
+ return is_set_port();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LSWorkerHeartbeat)
+ return this.equals((LSWorkerHeartbeat)that);
+ return false;
+ }
+
+ public boolean equals(LSWorkerHeartbeat that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_time_secs = true;
+ boolean that_present_time_secs = true;
+ if (this_present_time_secs || that_present_time_secs) {
+ if (!(this_present_time_secs && that_present_time_secs))
+ return false;
+ if (this.time_secs != that.time_secs)
+ return false;
+ }
+
+ boolean this_present_topology_id = true && this.is_set_topology_id();
+ boolean that_present_topology_id = true && that.is_set_topology_id();
+ if (this_present_topology_id || that_present_topology_id) {
+ if (!(this_present_topology_id && that_present_topology_id))
+ return false;
+ if (!this.topology_id.equals(that.topology_id))
+ return false;
+ }
+
+ boolean this_present_executors = true && this.is_set_executors();
+ boolean that_present_executors = true && that.is_set_executors();
+ if (this_present_executors || that_present_executors) {
+ if (!(this_present_executors && that_present_executors))
+ return false;
+ if (!this.executors.equals(that.executors))
+ return false;
+ }
+
+ boolean this_present_port = true;
+ boolean that_present_port = true;
+ if (this_present_port || that_present_port) {
+ if (!(this_present_port && that_present_port))
+ return false;
+ if (this.port != that.port)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_time_secs = true;
+ list.add(present_time_secs);
+ if (present_time_secs)
+ list.add(time_secs);
+
+ boolean present_topology_id = true && (is_set_topology_id());
+ list.add(present_topology_id);
+ if (present_topology_id)
+ list.add(topology_id);
+
+ boolean present_executors = true && (is_set_executors());
+ list.add(present_executors);
+ if (present_executors)
+ list.add(executors);
+
+ boolean present_port = true;
+ list.add(present_port);
+ if (present_port)
+ list.add(port);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(LSWorkerHeartbeat other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_time_secs()).compareTo(other.is_set_time_secs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_time_secs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.time_secs, other.time_secs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_topology_id()).compareTo(other.is_set_topology_id());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_topology_id()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topology_id, other.topology_id);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_executors()).compareTo(other.is_set_executors());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_executors()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executors, other.executors);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_port()).compareTo(other.is_set_port());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_port()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("LSWorkerHeartbeat(");
+ boolean first = true;
+
+ sb.append("time_secs:");
+ sb.append(this.time_secs);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("topology_id:");
+ if (this.topology_id == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.topology_id);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("executors:");
+ if (this.executors == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.executors);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("port:");
+ sb.append(this.port);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_time_secs()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'time_secs' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_topology_id()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'topology_id' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_executors()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'executors' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_port()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LSWorkerHeartbeatStandardSchemeFactory implements SchemeFactory {
+ public LSWorkerHeartbeatStandardScheme getScheme() {
+ return new LSWorkerHeartbeatStandardScheme();
+ }
+ }
+
+ private static class LSWorkerHeartbeatStandardScheme extends StandardScheme<LSWorkerHeartbeat> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LSWorkerHeartbeat struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TIME_SECS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.time_secs = iprot.readI32();
+ struct.set_time_secs_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // TOPOLOGY_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.topology_id = iprot.readString();
+ struct.set_topology_id_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // EXECUTORS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list474 = iprot.readListBegin();
+ struct.executors = new ArrayList<ExecutorInfo>(_list474.size);
+ ExecutorInfo _elem475;
+ for (int _i476 = 0; _i476 < _list474.size; ++_i476)
+ {
+ _elem475 = new ExecutorInfo();
+ _elem475.read(iprot);
+ struct.executors.add(_elem475);
+ }
+ iprot.readListEnd();
+ }
+ struct.set_executors_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 4: // PORT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.port = iprot.readI32();
+ struct.set_port_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LSWorkerHeartbeat struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
+ oprot.writeI32(struct.time_secs);
+ oprot.writeFieldEnd();
+ if (struct.topology_id != null) {
+ oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC);
+ oprot.writeString(struct.topology_id);
+ oprot.writeFieldEnd();
+ }
+ if (struct.executors != null) {
+ oprot.writeFieldBegin(EXECUTORS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.executors.size()));
+ for (ExecutorInfo _iter477 : struct.executors)
+ {
+ _iter477.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(PORT_FIELD_DESC);
+ oprot.writeI32(struct.port);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LSWorkerHeartbeatTupleSchemeFactory implements SchemeFactory {
+ public LSWorkerHeartbeatTupleScheme getScheme() {
+ return new LSWorkerHeartbeatTupleScheme();
+ }
+ }
+
+ private static class LSWorkerHeartbeatTupleScheme extends TupleScheme<LSWorkerHeartbeat> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeI32(struct.time_secs);
+ oprot.writeString(struct.topology_id);
+ {
+ oprot.writeI32(struct.executors.size());
+ for (ExecutorInfo _iter478 : struct.executors)
+ {
+ _iter478.write(oprot);
+ }
+ }
+ oprot.writeI32(struct.port);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.time_secs = iprot.readI32();
+ struct.set_time_secs_isSet(true);
+ struct.topology_id = iprot.readString();
+ struct.set_topology_id_isSet(true);
+ {
+ org.apache.thrift.protocol.TList _list479 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executors = new ArrayList<ExecutorInfo>(_list479.size);
+ ExecutorInfo _elem480;
+ for (int _i481 = 0; _i481 < _list479.size; ++_i481)
+ {
+ _elem480 = new ExecutorInfo();
+ _elem480.read(iprot);
+ struct.executors.add(_elem480);
+ }
+ }
+ struct.set_executors_isSet(true);
+ struct.port = iprot.readI32();
+ struct.set_port_isSet(true);
+ }
+ }
+
+}
+
[3/4] storm git commit: Merge branch 'STORM-765-clean' of
https://github.com/revans2/incubator-storm into STORM-765
Posted by bo...@apache.org.
Merge branch 'STORM-765-clean' of https://github.com/revans2/incubator-storm into STORM-765
STORM-765: Thrift serialization for local state
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9c92d7f2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9c92d7f2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9c92d7f2
Branch: refs/heads/master
Commit: 9c92d7f2e029053e1b2f459b9c6c2c425ecf008f
Parents: 2a45a9a ad4375e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 15 10:40:16 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Apr 15 10:40:16 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/common.clj | 13 -
.../clj/backtype/storm/daemon/supervisor.clj | 34 +-
.../src/clj/backtype/storm/daemon/worker.clj | 14 +-
.../src/clj/backtype/storm/local_state.clj | 99 +++
storm-core/src/clj/backtype/storm/testing.clj | 6 +-
.../storm/generated/LSApprovedWorkers.java | 458 +++++++++++
.../generated/LSSupervisorAssignments.java | 471 ++++++++++++
.../storm/generated/LSSupervisorId.java | 406 ++++++++++
.../storm/generated/LSWorkerHeartbeat.java | 755 +++++++++++++++++++
.../storm/generated/LocalAssignment.java | 561 ++++++++++++++
.../storm/generated/LocalStateData.java | 471 ++++++++++++
.../storm/generated/ThriftSerializedObject.java | 516 +++++++++++++
.../jvm/backtype/storm/utils/LocalState.java | 163 +++-
storm-core/src/py/storm/ttypes.py | 594 +++++++++++++++
storm-core/src/storm.thrift | 33 +
.../clj/backtype/storm/local_state_test.clj | 40 +-
16 files changed, 4530 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9c92d7f2/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9c92d7f2/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9c92d7f2/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --cc storm-core/src/storm.thrift
index 3971a43,f3b45e6..b502181
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@@ -292,9 -292,41 +292,42 @@@ struct ClusterWorkerHeartbeat
1: required string storm_id;
2: required map<ExecutorInfo,ExecutorStats> executor_stats;
3: required i32 time_secs;
+ 4: required i32 uptime_secs;
}
+ struct ThriftSerializedObject {
+ 1: required string name;
+ 2: required binary bits;
+ }
+
+ struct LocalStateData {
+ 1: required map<string, ThriftSerializedObject> serialized_parts;
+ }
+
+ struct LocalAssignment {
+ 1: required string topology_id;
+ 2: required list<ExecutorInfo> executors;
+ }
+
+ struct LSSupervisorId {
+ 1: required string supervisor_id;
+ }
+
+ struct LSApprovedWorkers {
+ 1: required map<string, i32> approved_workers;
+ }
+
+ struct LSSupervisorAssignments {
+ 1: required map<i32, LocalAssignment> assignments;
+ }
+
+ struct LSWorkerHeartbeat {
+ 1: required i32 time_secs;
+ 2: required string topology_id;
+ 3: required list<ExecutorInfo> executors
+ 4: required i32 port;
+ }
+
enum NumErrorsChoice {
ALL,
NONE,
[4/4] storm git commit: Added STORM-765 to Changelog
Posted by bo...@apache.org.
Added STORM-765 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7c83108
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7c83108
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7c83108
Branch: refs/heads/master
Commit: a7c83108a77a2d04dfc0e43ade235b0db2921a29
Parents: 9c92d7f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 15 14:09:30 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Apr 15 14:09:30 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a7c83108/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9ff4c19..16b326c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-765: Thrift serialization for local state
* STORM-762: uptime for worker heartbeats is lost when converted to thrift
* STORM-757: Simulated time can leak out on errors
* STORM-694: java.lang.ClassNotFoundException: backtype.storm.daemon.common.SupervisorInfo