You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/25 18:55:39 UTC
[1/2] storm git commit: STORM-3117 prevent deleting blobs while
topologies still active
Repository: storm
Updated Branches:
refs/heads/master 950e527d5 -> 21bb13884
STORM-3117 prevent deleting blobs while topologies still active
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/733d58cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/733d58cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/733d58cd
Branch: refs/heads/master
Commit: 733d58cd687517678ebb6e78890327796fd0aef6
Parents: 7122a4e
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Thu Jun 21 14:56:19 2018 -0500
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Mon Jun 25 13:04:25 2018 -0500
----------------------------------------------------------------------
.../storm/generated/IllegalStateException.java | 380 +++++++++++++++++++
.../jvm/org/apache/storm/generated/Nimbus.java | 130 ++++++-
.../utils/WrappedIllegalStateException.java | 32 ++
storm-client/src/py/storm/Nimbus.py | 20 +-
storm-client/src/py/storm/ttypes.py | 66 ++++
storm-client/src/storm.thrift | 6 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 27 +-
7 files changed, 649 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
new file mode 100644
index 0000000..4b16837
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.11.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.storm.generated;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)")
+public class IllegalStateException extends org.apache.storm.thrift.TException implements org.apache.storm.thrift.TBase<IllegalStateException, IllegalStateException._Fields>, java.io.Serializable, Cloneable, Comparable<IllegalStateException> {
+ private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("IllegalStateException");
+
+ private static final org.apache.storm.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("msg", org.apache.storm.thrift.protocol.TType.STRING, (short)1);
+
+ private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new IllegalStateExceptionStandardSchemeFactory();
+ private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new IllegalStateExceptionTupleSchemeFactory();
+
+ private java.lang.String msg; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
+ MSG((short)1, "msg");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MSG
+ return MSG;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MSG, new org.apache.storm.thrift.meta_data.FieldMetaData("msg", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(IllegalStateException.class, metaDataMap);
+ }
+
+ public IllegalStateException() {
+ }
+
+ public IllegalStateException(
+ java.lang.String msg)
+ {
+ this();
+ this.msg = msg;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public IllegalStateException(IllegalStateException other) {
+ if (other.is_set_msg()) {
+ this.msg = other.msg;
+ }
+ }
+
+ public IllegalStateException deepCopy() {
+ return new IllegalStateException(this);
+ }
+
+ @Override
+ public void clear() {
+ this.msg = null;
+ }
+
+ public java.lang.String get_msg() {
+ return this.msg;
+ }
+
+ public void set_msg(java.lang.String msg) {
+ this.msg = msg;
+ }
+
+ public void unset_msg() {
+ this.msg = null;
+ }
+
+ /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+ public boolean is_set_msg() {
+ return this.msg != null;
+ }
+
+ public void set_msg_isSet(boolean value) {
+ if (!value) {
+ this.msg = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, java.lang.Object value) {
+ switch (field) {
+ case MSG:
+ if (value == null) {
+ unset_msg();
+ } else {
+ set_msg((java.lang.String)value);
+ }
+ break;
+
+ }
+ }
+
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MSG:
+ return get_msg();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MSG:
+ return is_set_msg();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof IllegalStateException)
+ return this.equals((IllegalStateException)that);
+ return false;
+ }
+
+ public boolean equals(IllegalStateException that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_msg = true && this.is_set_msg();
+ boolean that_present_msg = true && that.is_set_msg();
+ if (this_present_msg || that_present_msg) {
+ if (!(this_present_msg && that_present_msg))
+ return false;
+ if (!this.msg.equals(that.msg))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((is_set_msg()) ? 131071 : 524287);
+ if (is_set_msg())
+ hashCode = hashCode * 8191 + msg.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(IllegalStateException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_msg()) {
+ lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.msg, other.msg);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.storm.thrift.protocol.TProtocol iprot) throws org.apache.storm.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.storm.thrift.protocol.TProtocol oprot) throws org.apache.storm.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("IllegalStateException(");
+ boolean first = true;
+
+ sb.append("msg:");
+ if (this.msg == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.msg);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.storm.thrift.TException {
+ // check for required fields
+ if (!is_set_msg()) {
+ throw new org.apache.storm.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.storm.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.storm.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class IllegalStateExceptionStandardSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory {
+ public IllegalStateExceptionStandardScheme getScheme() {
+ return new IllegalStateExceptionStandardScheme();
+ }
+ }
+
+ private static class IllegalStateExceptionStandardScheme extends org.apache.storm.thrift.scheme.StandardScheme<IllegalStateException> {
+
+ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+ org.apache.storm.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.storm.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // MSG
+ if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRING) {
+ struct.msg = iprot.readString();
+ struct.set_msg_isSet(true);
+ } else {
+ org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.msg != null) {
+ oprot.writeFieldBegin(MSG_FIELD_DESC);
+ oprot.writeString(struct.msg);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class IllegalStateExceptionTupleSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory {
+ public IllegalStateExceptionTupleScheme getScheme() {
+ return new IllegalStateExceptionTupleScheme();
+ }
+ }
+
+ private static class IllegalStateExceptionTupleScheme extends org.apache.storm.thrift.scheme.TupleScheme<IllegalStateException> {
+
+ @Override
+ public void write(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+ org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
+ oprot.writeString(struct.msg);
+ }
+
+ @Override
+ public void read(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+ org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
+ struct.msg = iprot.readString();
+ struct.set_msg_isSet(true);
+ }
+ }
+
+ private static <S extends org.apache.storm.thrift.scheme.IScheme> S scheme(org.apache.storm.thrift.protocol.TProtocol proto) {
+ return (org.apache.storm.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
index 9fcc8ee..de9a3b3 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
@@ -85,7 +85,7 @@ public class Nimbus {
public java.nio.ByteBuffer downloadBlobChunk(java.lang.String session) throws AuthorizationException, org.apache.storm.thrift.TException;
- public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException;
+ public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException;
public ListBlobsResult listBlobs(java.lang.String session) throws org.apache.storm.thrift.TException;
@@ -890,7 +890,7 @@ public class Nimbus {
throw new org.apache.storm.thrift.TApplicationException(org.apache.storm.thrift.TApplicationException.MISSING_RESULT, "downloadBlobChunk failed: unknown result");
}
- public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException
+ public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException
{
send_deleteBlob(key);
recv_deleteBlob();
@@ -903,7 +903,7 @@ public class Nimbus {
sendBase("deleteBlob", args);
}
- public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException
+ public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException
{
deleteBlob_result result = new deleteBlob_result();
receiveBase(result, "deleteBlob");
@@ -913,6 +913,9 @@ public class Nimbus {
if (result.knf != null) {
throw result.knf;
}
+ if (result.ise != null) {
+ throw result.ise;
+ }
return;
}
@@ -2460,7 +2463,7 @@ public class Nimbus {
prot.writeMessageEnd();
}
- public Void getResult() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException {
+ public Void getResult() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException {
if (getState() != org.apache.storm.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new java.lang.IllegalStateException("Method call not finished!");
}
@@ -4135,6 +4138,8 @@ public class Nimbus {
result.aze = aze;
} catch (KeyNotFoundException knf) {
result.knf = knf;
+ } catch (IllegalStateException ise) {
+ result.ise = ise;
}
return result;
}
@@ -6540,6 +6545,10 @@ public class Nimbus {
result.knf = (KeyNotFoundException) e;
result.set_knf_isSet(true);
msg = result;
+ } else if (e instanceof IllegalStateException) {
+ result.ise = (IllegalStateException) e;
+ result.set_ise_isSet(true);
+ msg = result;
} else if (e instanceof org.apache.storm.thrift.transport.TTransportException) {
_LOGGER.error("TTransportException inside handler", e);
fb.close();
@@ -29008,17 +29017,20 @@ public class Nimbus {
private static final org.apache.storm.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("aze", org.apache.storm.thrift.protocol.TType.STRUCT, (short)1);
private static final org.apache.storm.thrift.protocol.TField KNF_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("knf", org.apache.storm.thrift.protocol.TType.STRUCT, (short)2);
+ private static final org.apache.storm.thrift.protocol.TField ISE_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("ise", org.apache.storm.thrift.protocol.TType.STRUCT, (short)3);
private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new deleteBlob_resultStandardSchemeFactory();
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new deleteBlob_resultTupleSchemeFactory();
private AuthorizationException aze; // required
private KeyNotFoundException knf; // required
+ private IllegalStateException ise; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
AZE((short)1, "aze"),
- KNF((short)2, "knf");
+ KNF((short)2, "knf"),
+ ISE((short)3, "ise");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -29037,6 +29049,8 @@ public class Nimbus {
return AZE;
case 2: // KNF
return KNF;
+ case 3: // ISE
+ return ISE;
default:
return null;
}
@@ -29084,6 +29098,8 @@ public class Nimbus {
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, AuthorizationException.class)));
tmpMap.put(_Fields.KNF, new org.apache.storm.thrift.meta_data.FieldMetaData("knf", org.apache.storm.thrift.TFieldRequirementType.DEFAULT,
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, KeyNotFoundException.class)));
+ tmpMap.put(_Fields.ISE, new org.apache.storm.thrift.meta_data.FieldMetaData("ise", org.apache.storm.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, IllegalStateException.class)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(deleteBlob_result.class, metaDataMap);
}
@@ -29093,11 +29109,13 @@ public class Nimbus {
public deleteBlob_result(
AuthorizationException aze,
- KeyNotFoundException knf)
+ KeyNotFoundException knf,
+ IllegalStateException ise)
{
this();
this.aze = aze;
this.knf = knf;
+ this.ise = ise;
}
/**
@@ -29110,6 +29128,9 @@ public class Nimbus {
if (other.is_set_knf()) {
this.knf = new KeyNotFoundException(other.knf);
}
+ if (other.is_set_ise()) {
+ this.ise = new IllegalStateException(other.ise);
+ }
}
public deleteBlob_result deepCopy() {
@@ -29120,6 +29141,7 @@ public class Nimbus {
public void clear() {
this.aze = null;
this.knf = null;
+ this.ise = null;
}
public AuthorizationException get_aze() {
@@ -29168,6 +29190,29 @@ public class Nimbus {
}
}
+ public IllegalStateException get_ise() {
+ return this.ise;
+ }
+
+ public void set_ise(IllegalStateException ise) {
+ this.ise = ise;
+ }
+
+ public void unset_ise() {
+ this.ise = null;
+ }
+
+ /** Returns true if field ise is set (has been assigned a value) and false otherwise */
+ public boolean is_set_ise() {
+ return this.ise != null;
+ }
+
+ public void set_ise_isSet(boolean value) {
+ if (!value) {
+ this.ise = null;
+ }
+ }
+
public void setFieldValue(_Fields field, java.lang.Object value) {
switch (field) {
case AZE:
@@ -29186,6 +29231,14 @@ public class Nimbus {
}
break;
+ case ISE:
+ if (value == null) {
+ unset_ise();
+ } else {
+ set_ise((IllegalStateException)value);
+ }
+ break;
+
}
}
@@ -29197,6 +29250,9 @@ public class Nimbus {
case KNF:
return get_knf();
+ case ISE:
+ return get_ise();
+
}
throw new java.lang.IllegalStateException();
}
@@ -29212,6 +29268,8 @@ public class Nimbus {
return is_set_aze();
case KNF:
return is_set_knf();
+ case ISE:
+ return is_set_ise();
}
throw new java.lang.IllegalStateException();
}
@@ -29249,6 +29307,15 @@ public class Nimbus {
return false;
}
+ boolean this_present_ise = true && this.is_set_ise();
+ boolean that_present_ise = true && that.is_set_ise();
+ if (this_present_ise || that_present_ise) {
+ if (!(this_present_ise && that_present_ise))
+ return false;
+ if (!this.ise.equals(that.ise))
+ return false;
+ }
+
return true;
}
@@ -29264,6 +29331,10 @@ public class Nimbus {
if (is_set_knf())
hashCode = hashCode * 8191 + knf.hashCode();
+ hashCode = hashCode * 8191 + ((is_set_ise()) ? 131071 : 524287);
+ if (is_set_ise())
+ hashCode = hashCode * 8191 + ise.hashCode();
+
return hashCode;
}
@@ -29295,6 +29366,16 @@ public class Nimbus {
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.valueOf(is_set_ise()).compareTo(other.is_set_ise());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_ise()) {
+ lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.ise, other.ise);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -29330,6 +29411,14 @@ public class Nimbus {
sb.append(this.knf);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("ise:");
+ if (this.ise == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.ise);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -29391,6 +29480,15 @@ public class Nimbus {
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // ISE
+ if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRUCT) {
+ struct.ise = new IllegalStateException();
+ struct.ise.read(iprot);
+ struct.set_ise_isSet(true);
+ } else {
+ org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -29414,6 +29512,11 @@ public class Nimbus {
struct.knf.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.ise != null) {
+ oprot.writeFieldBegin(ISE_FIELD_DESC);
+ struct.ise.write(oprot);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -29438,19 +29541,25 @@ public class Nimbus {
if (struct.is_set_knf()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.is_set_ise()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.is_set_aze()) {
struct.aze.write(oprot);
}
if (struct.is_set_knf()) {
struct.knf.write(oprot);
}
+ if (struct.is_set_ise()) {
+ struct.ise.write(oprot);
+ }
}
@Override
public void read(org.apache.storm.thrift.protocol.TProtocol prot, deleteBlob_result struct) throws org.apache.storm.thrift.TException {
org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(2);
+ java.util.BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.aze = new AuthorizationException();
struct.aze.read(iprot);
@@ -29461,6 +29570,11 @@ public class Nimbus {
struct.knf.read(iprot);
struct.set_knf_isSet(true);
}
+ if (incoming.get(2)) {
+ struct.ise = new IllegalStateException();
+ struct.ise.read(iprot);
+ struct.set_ise_isSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
new file mode 100644
index 0000000..3df9c0a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.generated.IllegalStateException;
+
+public class WrappedIllegalStateException extends IllegalStateException {
+ public WrappedIllegalStateException(String msg) {
+ super(msg);
+ }
+
+ @Override
+ public String getMessage() {
+ return this.get_msg();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py
index 182dbe2..a4e6484 100644
--- a/storm-client/src/py/storm/Nimbus.py
+++ b/storm-client/src/py/storm/Nimbus.py
@@ -1235,6 +1235,8 @@ class Client(Iface):
raise result.aze
if result.knf is not None:
raise result.knf
+ if result.ise is not None:
+ raise result.ise
return
def listBlobs(self, session):
@@ -2879,6 +2881,9 @@ class Processor(Iface, TProcessor):
except KeyNotFoundException as knf:
msg_type = TMessageType.REPLY
result.knf = knf
+ except IllegalStateException as ise:
+ msg_type = TMessageType.REPLY
+ result.ise = ise
except TApplicationException as ex:
logging.exception('TApplication exception in handler')
msg_type = TMessageType.EXCEPTION
@@ -6928,12 +6933,14 @@ class deleteBlob_result(object):
Attributes:
- aze
- knf
+ - ise
"""
- def __init__(self, aze=None, knf=None,):
+ def __init__(self, aze=None, knf=None, ise=None,):
self.aze = aze
self.knf = knf
+ self.ise = ise
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -6956,6 +6963,12 @@ class deleteBlob_result(object):
self.knf.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRUCT:
+ self.ise = IllegalStateException()
+ self.ise.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -6974,6 +6987,10 @@ class deleteBlob_result(object):
oprot.writeFieldBegin('knf', TType.STRUCT, 2)
self.knf.write(oprot)
oprot.writeFieldEnd()
+ if self.ise is not None:
+ oprot.writeFieldBegin('ise', TType.STRUCT, 3)
+ self.ise.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -6995,6 +7012,7 @@ deleteBlob_result.thrift_spec = (
None, # 0
(1, TType.STRUCT, 'aze', [AuthorizationException, None], None, ), # 1
(2, TType.STRUCT, 'knf', [KeyNotFoundException, None], None, ), # 2
+ (3, TType.STRUCT, 'ise', [IllegalStateException, None], None, ), # 3
)
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 48d2886..0d7ea12 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -1921,6 +1921,67 @@ class KeyNotFoundException(TException):
return not (self == other)
+class IllegalStateException(TException):
+ """
+ Attributes:
+ - msg
+ """
+
+
+ def __init__(self, msg=None,):
+ self.msg = msg
+
+ def read(self, iprot):
+ if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
+ iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec])
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.msg = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot._fast_encode is not None and self.thrift_spec is not None:
+ oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec]))
+ return
+ oprot.writeStructBegin('IllegalStateException')
+ if self.msg is not None:
+ oprot.writeFieldBegin('msg', TType.STRING, 1)
+ oprot.writeString(self.msg.encode('utf-8') if sys.version_info[0] == 2 else self.msg)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.msg is None:
+ raise TProtocolException(message='Required field msg is unset!')
+ return
+
+ def __str__(self):
+ return repr(self)
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.items()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+
class KeyAlreadyExistsException(TException):
"""
Attributes:
@@ -10201,6 +10262,11 @@ KeyNotFoundException.thrift_spec = (
None, # 0
(1, TType.STRING, 'msg', 'UTF8', None, ), # 1
)
+all_structs.append(IllegalStateException)
+IllegalStateException.thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'msg', 'UTF8', None, ), # 1
+)
all_structs.append(KeyAlreadyExistsException)
KeyAlreadyExistsException.thrift_spec = (
None, # 0
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index fefc6db..724c5b4 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -153,6 +153,10 @@ exception KeyNotFoundException {
1: required string msg;
}
+exception IllegalStateException {
+ 1: required string msg;
+}
+
exception KeyAlreadyExistsException {
1: required string msg;
}
@@ -744,7 +748,7 @@ service Nimbus {
void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze);
- void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf, 3: IllegalStateException ise);
ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning
i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e19d0ef..3c16745 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -87,6 +87,7 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GetInfoOptions;
+import org.apache.storm.generated.IllegalStateException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
@@ -202,6 +203,7 @@ import org.apache.storm.utils.Utils.UptimeComputer;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.utils.WrappedAlreadyAliveException;
import org.apache.storm.utils.WrappedAuthorizationException;
+import org.apache.storm.utils.WrappedIllegalStateException;
import org.apache.storm.utils.WrappedInvalidTopologyException;
import org.apache.storm.utils.WrappedNotAliveException;
import org.apache.storm.validation.ConfigValidation;
@@ -820,10 +822,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
+ /**
+ * NOTE: this can return false when a topology has just been activated. The topology may still be
+ * in the STORMS_SUBTREE.
+ */
private static boolean isTopologyActive(IStormClusterState state, String topoName) {
return state.getTopoId(topoName).isPresent();
}
+ private static boolean isTopologyActiveOrActivating(IStormClusterState state, String topoName) {
+ return isTopologyActive(state, topoName) || state.activeStorms().contains(topoName);
+ }
+
private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc)
throws NotAliveException, AuthorizationException, IOException {
try {
@@ -2672,7 +2682,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
summary.set_assigned_memoffheap(resources.getAssignedMemOffHeap());
summary.set_assigned_cpu(resources.getAssignedCpu());
}
- summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId)));
+ try {
+ summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId)));
+ } catch (KeyNotFoundException e) {
+ // This could fail if a blob gets deleted by mistake. Don't crash nimbus.
+ LOG.error("Unable to find blob entry", e);
+ }
topologySummaries.add(summary);
}
@@ -3555,8 +3570,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
@Override
- public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
+ public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, TException {
try {
+ String topoName = ConfigUtils.getIdFromBlobKey(key);
+ if (topoName != null) {
+ if (isTopologyActiveOrActivating(stormClusterState, topoName)) {
+ String message = "Attempting to delete blob " + key + " from under active topology " + topoName;
+ LOG.warn(message);
+ throw new WrappedIllegalStateException(message);
+ }
+ }
blobStore.deleteBlob(key, getSubject());
LOG.info("Deleted blob for key {}", key);
} catch (Exception e) {
[2/2] storm git commit: Merge branch 'agresch_blob_delete' of
https://github.com/agresch/storm into STORM-3117
Posted by bo...@apache.org.
Merge branch 'agresch_blob_delete' of https://github.com/agresch/storm into STORM-3117
STORM-3117: prevent deleting blobs while topologies still active
This closes #2732
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21bb1388
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21bb1388
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21bb1388
Branch: refs/heads/master
Commit: 21bb1388414d373572779289edc785c7e5aa52aa
Parents: 950e527 733d58c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Jun 25 13:33:30 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Jun 25 13:33:30 2018 -0500
----------------------------------------------------------------------
.../storm/generated/IllegalStateException.java | 380 +++++++++++++++++++
.../jvm/org/apache/storm/generated/Nimbus.java | 130 ++++++-
.../utils/WrappedIllegalStateException.java | 32 ++
storm-client/src/py/storm/Nimbus.py | 20 +-
storm-client/src/py/storm/ttypes.py | 66 ++++
storm-client/src/storm.thrift | 6 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 27 +-
7 files changed, 649 insertions(+), 12 deletions(-)
----------------------------------------------------------------------