You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/25 18:55:39 UTC

[1/2] storm git commit: STORM-3117 prevent deleting blobs while topologies still active

Repository: storm
Updated Branches:
  refs/heads/master 950e527d5 -> 21bb13884


STORM-3117 prevent deleting blobs while topologies still active


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/733d58cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/733d58cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/733d58cd

Branch: refs/heads/master
Commit: 733d58cd687517678ebb6e78890327796fd0aef6
Parents: 7122a4e
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Thu Jun 21 14:56:19 2018 -0500
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Mon Jun 25 13:04:25 2018 -0500

----------------------------------------------------------------------
 .../storm/generated/IllegalStateException.java  | 380 +++++++++++++++++++
 .../jvm/org/apache/storm/generated/Nimbus.java  | 130 ++++++-
 .../utils/WrappedIllegalStateException.java     |  32 ++
 storm-client/src/py/storm/Nimbus.py             |  20 +-
 storm-client/src/py/storm/ttypes.py             |  66 ++++
 storm-client/src/storm.thrift                   |   6 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  27 +-
 7 files changed, 649 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
new file mode 100644
index 0000000..4b16837
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.11.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)")
+public class IllegalStateException extends org.apache.storm.thrift.TException implements org.apache.storm.thrift.TBase<IllegalStateException, IllegalStateException._Fields>, java.io.Serializable, Cloneable, Comparable<IllegalStateException> {
+  private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("IllegalStateException");
+
+  private static final org.apache.storm.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("msg", org.apache.storm.thrift.protocol.TType.STRING, (short)1);
+
+  private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new IllegalStateExceptionStandardSchemeFactory();
+  private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new IllegalStateExceptionTupleSchemeFactory();
+
+  private java.lang.String msg; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
+    MSG((short)1, "msg");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // MSG
+          return MSG;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.MSG, new org.apache.storm.thrift.meta_data.FieldMetaData("msg", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(IllegalStateException.class, metaDataMap);
+  }
+
+  public IllegalStateException() {
+  }
+
+  public IllegalStateException(
+    java.lang.String msg)
+  {
+    this();
+    this.msg = msg;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public IllegalStateException(IllegalStateException other) {
+    if (other.is_set_msg()) {
+      this.msg = other.msg;
+    }
+  }
+
+  public IllegalStateException deepCopy() {
+    return new IllegalStateException(this);
+  }
+
+  @Override
+  public void clear() {
+    this.msg = null;
+  }
+
+  public java.lang.String get_msg() {
+    return this.msg;
+  }
+
+  public void set_msg(java.lang.String msg) {
+    this.msg = msg;
+  }
+
+  public void unset_msg() {
+    this.msg = null;
+  }
+
+  /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+  public boolean is_set_msg() {
+    return this.msg != null;
+  }
+
+  public void set_msg_isSet(boolean value) {
+    if (!value) {
+      this.msg = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, java.lang.Object value) {
+    switch (field) {
+    case MSG:
+      if (value == null) {
+        unset_msg();
+      } else {
+        set_msg((java.lang.String)value);
+      }
+      break;
+
+    }
+  }
+
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+    case MSG:
+      return get_msg();
+
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+    case MSG:
+      return is_set_msg();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof IllegalStateException)
+      return this.equals((IllegalStateException)that);
+    return false;
+  }
+
+  public boolean equals(IllegalStateException that) {
+    if (that == null)
+      return false;
+    if (this == that)
+      return true;
+
+    boolean this_present_msg = true && this.is_set_msg();
+    boolean that_present_msg = true && that.is_set_msg();
+    if (this_present_msg || that_present_msg) {
+      if (!(this_present_msg && that_present_msg))
+        return false;
+      if (!this.msg.equals(that.msg))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + ((is_set_msg()) ? 131071 : 524287);
+    if (is_set_msg())
+      hashCode = hashCode * 8191 + msg.hashCode();
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(IllegalStateException other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_msg()) {
+      lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.msg, other.msg);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.storm.thrift.protocol.TProtocol iprot) throws org.apache.storm.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  public void write(org.apache.storm.thrift.protocol.TProtocol oprot) throws org.apache.storm.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("IllegalStateException(");
+    boolean first = true;
+
+    sb.append("msg:");
+    if (this.msg == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.msg);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.storm.thrift.TException {
+    // check for required fields
+    if (!is_set_msg()) {
+      throw new org.apache.storm.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.storm.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      read(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.storm.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class IllegalStateExceptionStandardSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory {
+    public IllegalStateExceptionStandardScheme getScheme() {
+      return new IllegalStateExceptionStandardScheme();
+    }
+  }
+
+  private static class IllegalStateExceptionStandardScheme extends org.apache.storm.thrift.scheme.StandardScheme<IllegalStateException> {
+
+    public void read(org.apache.storm.thrift.protocol.TProtocol iprot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+      org.apache.storm.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.storm.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // MSG
+            if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRING) {
+              struct.msg = iprot.readString();
+              struct.set_msg_isSet(true);
+            } else { 
+              org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.storm.thrift.protocol.TProtocol oprot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.msg != null) {
+        oprot.writeFieldBegin(MSG_FIELD_DESC);
+        oprot.writeString(struct.msg);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class IllegalStateExceptionTupleSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory {
+    public IllegalStateExceptionTupleScheme getScheme() {
+      return new IllegalStateExceptionTupleScheme();
+    }
+  }
+
+  private static class IllegalStateExceptionTupleScheme extends org.apache.storm.thrift.scheme.TupleScheme<IllegalStateException> {
+
+    @Override
+    public void write(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+      org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
+      oprot.writeString(struct.msg);
+    }
+
+    @Override
+    public void read(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException {
+      org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
+      struct.msg = iprot.readString();
+      struct.set_msg_isSet(true);
+    }
+  }
+
+  private static <S extends org.apache.storm.thrift.scheme.IScheme> S scheme(org.apache.storm.thrift.protocol.TProtocol proto) {
+    return (org.apache.storm.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
index 9fcc8ee..de9a3b3 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
@@ -85,7 +85,7 @@ public class Nimbus {
 
     public java.nio.ByteBuffer downloadBlobChunk(java.lang.String session) throws AuthorizationException, org.apache.storm.thrift.TException;
 
-    public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException;
+    public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException;
 
     public ListBlobsResult listBlobs(java.lang.String session) throws org.apache.storm.thrift.TException;
 
@@ -890,7 +890,7 @@ public class Nimbus {
       throw new org.apache.storm.thrift.TApplicationException(org.apache.storm.thrift.TApplicationException.MISSING_RESULT, "downloadBlobChunk failed: unknown result");
     }
 
-    public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException
+    public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException
     {
       send_deleteBlob(key);
       recv_deleteBlob();
@@ -903,7 +903,7 @@ public class Nimbus {
       sendBase("deleteBlob", args);
     }
 
-    public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException
+    public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException
     {
       deleteBlob_result result = new deleteBlob_result();
       receiveBase(result, "deleteBlob");
@@ -913,6 +913,9 @@ public class Nimbus {
       if (result.knf != null) {
         throw result.knf;
       }
+      if (result.ise != null) {
+        throw result.ise;
+      }
       return;
     }
 
@@ -2460,7 +2463,7 @@ public class Nimbus {
         prot.writeMessageEnd();
       }
 
-      public Void getResult() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException {
+      public Void getResult() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException {
         if (getState() != org.apache.storm.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
           throw new java.lang.IllegalStateException("Method call not finished!");
         }
@@ -4135,6 +4138,8 @@ public class Nimbus {
           result.aze = aze;
         } catch (KeyNotFoundException knf) {
           result.knf = knf;
+        } catch (IllegalStateException ise) {
+          result.ise = ise;
         }
         return result;
       }
@@ -6540,6 +6545,10 @@ public class Nimbus {
               result.knf = (KeyNotFoundException) e;
               result.set_knf_isSet(true);
               msg = result;
+            } else if (e instanceof IllegalStateException) {
+              result.ise = (IllegalStateException) e;
+              result.set_ise_isSet(true);
+              msg = result;
             } else if (e instanceof org.apache.storm.thrift.transport.TTransportException) {
               _LOGGER.error("TTransportException inside handler", e);
               fb.close();
@@ -29008,17 +29017,20 @@ public class Nimbus {
 
     private static final org.apache.storm.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("aze", org.apache.storm.thrift.protocol.TType.STRUCT, (short)1);
     private static final org.apache.storm.thrift.protocol.TField KNF_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("knf", org.apache.storm.thrift.protocol.TType.STRUCT, (short)2);
+    private static final org.apache.storm.thrift.protocol.TField ISE_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("ise", org.apache.storm.thrift.protocol.TType.STRUCT, (short)3);
 
     private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new deleteBlob_resultStandardSchemeFactory();
     private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new deleteBlob_resultTupleSchemeFactory();
 
     private AuthorizationException aze; // required
     private KeyNotFoundException knf; // required
+    private IllegalStateException ise; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
       AZE((short)1, "aze"),
-      KNF((short)2, "knf");
+      KNF((short)2, "knf"),
+      ISE((short)3, "ise");
 
       private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -29037,6 +29049,8 @@ public class Nimbus {
             return AZE;
           case 2: // KNF
             return KNF;
+          case 3: // ISE
+            return ISE;
           default:
             return null;
         }
@@ -29084,6 +29098,8 @@ public class Nimbus {
           new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, AuthorizationException.class)));
       tmpMap.put(_Fields.KNF, new org.apache.storm.thrift.meta_data.FieldMetaData("knf", org.apache.storm.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, KeyNotFoundException.class)));
+      tmpMap.put(_Fields.ISE, new org.apache.storm.thrift.meta_data.FieldMetaData("ise", org.apache.storm.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, IllegalStateException.class)));
       metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
       org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(deleteBlob_result.class, metaDataMap);
     }
@@ -29093,11 +29109,13 @@ public class Nimbus {
 
     public deleteBlob_result(
       AuthorizationException aze,
-      KeyNotFoundException knf)
+      KeyNotFoundException knf,
+      IllegalStateException ise)
     {
       this();
       this.aze = aze;
       this.knf = knf;
+      this.ise = ise;
     }
 
     /**
@@ -29110,6 +29128,9 @@ public class Nimbus {
       if (other.is_set_knf()) {
         this.knf = new KeyNotFoundException(other.knf);
       }
+      if (other.is_set_ise()) {
+        this.ise = new IllegalStateException(other.ise);
+      }
     }
 
     public deleteBlob_result deepCopy() {
@@ -29120,6 +29141,7 @@ public class Nimbus {
     public void clear() {
       this.aze = null;
       this.knf = null;
+      this.ise = null;
     }
 
     public AuthorizationException get_aze() {
@@ -29168,6 +29190,29 @@ public class Nimbus {
       }
     }
 
+    public IllegalStateException get_ise() {
+      return this.ise;
+    }
+
+    public void set_ise(IllegalStateException ise) {
+      this.ise = ise;
+    }
+
+    public void unset_ise() {
+      this.ise = null;
+    }
+
+    /** Returns true if field ise is set (has been assigned a value) and false otherwise */
+    public boolean is_set_ise() {
+      return this.ise != null;
+    }
+
+    public void set_ise_isSet(boolean value) {
+      if (!value) {
+        this.ise = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, java.lang.Object value) {
       switch (field) {
       case AZE:
@@ -29186,6 +29231,14 @@ public class Nimbus {
         }
         break;
 
+      case ISE:
+        if (value == null) {
+          unset_ise();
+        } else {
+          set_ise((IllegalStateException)value);
+        }
+        break;
+
       }
     }
 
@@ -29197,6 +29250,9 @@ public class Nimbus {
       case KNF:
         return get_knf();
 
+      case ISE:
+        return get_ise();
+
       }
       throw new java.lang.IllegalStateException();
     }
@@ -29212,6 +29268,8 @@ public class Nimbus {
         return is_set_aze();
       case KNF:
         return is_set_knf();
+      case ISE:
+        return is_set_ise();
       }
       throw new java.lang.IllegalStateException();
     }
@@ -29249,6 +29307,15 @@ public class Nimbus {
           return false;
       }
 
+      boolean this_present_ise = true && this.is_set_ise();
+      boolean that_present_ise = true && that.is_set_ise();
+      if (this_present_ise || that_present_ise) {
+        if (!(this_present_ise && that_present_ise))
+          return false;
+        if (!this.ise.equals(that.ise))
+          return false;
+      }
+
       return true;
     }
 
@@ -29264,6 +29331,10 @@ public class Nimbus {
       if (is_set_knf())
         hashCode = hashCode * 8191 + knf.hashCode();
 
+      hashCode = hashCode * 8191 + ((is_set_ise()) ? 131071 : 524287);
+      if (is_set_ise())
+        hashCode = hashCode * 8191 + ise.hashCode();
+
       return hashCode;
     }
 
@@ -29295,6 +29366,16 @@ public class Nimbus {
           return lastComparison;
         }
       }
+      lastComparison = java.lang.Boolean.valueOf(is_set_ise()).compareTo(other.is_set_ise());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_ise()) {
+        lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.ise, other.ise);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -29330,6 +29411,14 @@ public class Nimbus {
         sb.append(this.knf);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("ise:");
+      if (this.ise == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.ise);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -29391,6 +29480,15 @@ public class Nimbus {
                 org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 3: // ISE
+              if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRUCT) {
+                struct.ise = new IllegalStateException();
+                struct.ise.read(iprot);
+                struct.set_ise_isSet(true);
+              } else { 
+                org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -29414,6 +29512,11 @@ public class Nimbus {
           struct.knf.write(oprot);
           oprot.writeFieldEnd();
         }
+        if (struct.ise != null) {
+          oprot.writeFieldBegin(ISE_FIELD_DESC);
+          struct.ise.write(oprot);
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -29438,19 +29541,25 @@ public class Nimbus {
         if (struct.is_set_knf()) {
           optionals.set(1);
         }
-        oprot.writeBitSet(optionals, 2);
+        if (struct.is_set_ise()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
         if (struct.is_set_aze()) {
           struct.aze.write(oprot);
         }
         if (struct.is_set_knf()) {
           struct.knf.write(oprot);
         }
+        if (struct.is_set_ise()) {
+          struct.ise.write(oprot);
+        }
       }
 
       @Override
       public void read(org.apache.storm.thrift.protocol.TProtocol prot, deleteBlob_result struct) throws org.apache.storm.thrift.TException {
         org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
-        java.util.BitSet incoming = iprot.readBitSet(2);
+        java.util.BitSet incoming = iprot.readBitSet(3);
         if (incoming.get(0)) {
           struct.aze = new AuthorizationException();
           struct.aze.read(iprot);
@@ -29461,6 +29570,11 @@ public class Nimbus {
           struct.knf.read(iprot);
           struct.set_knf_isSet(true);
         }
+        if (incoming.get(2)) {
+          struct.ise = new IllegalStateException();
+          struct.ise.read(iprot);
+          struct.set_ise_isSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
new file mode 100644
index 0000000..3df9c0a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.generated.IllegalStateException;
+
+public class WrappedIllegalStateException extends IllegalStateException {
+    public WrappedIllegalStateException(String msg) {
+        super(msg);
+    }
+
+    @Override
+    public String getMessage() {
+        return this.get_msg();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py
index 182dbe2..a4e6484 100644
--- a/storm-client/src/py/storm/Nimbus.py
+++ b/storm-client/src/py/storm/Nimbus.py
@@ -1235,6 +1235,8 @@ class Client(Iface):
             raise result.aze
         if result.knf is not None:
             raise result.knf
+        if result.ise is not None:
+            raise result.ise
         return
 
     def listBlobs(self, session):
@@ -2879,6 +2881,9 @@ class Processor(Iface, TProcessor):
         except KeyNotFoundException as knf:
             msg_type = TMessageType.REPLY
             result.knf = knf
+        except IllegalStateException as ise:
+            msg_type = TMessageType.REPLY
+            result.ise = ise
         except TApplicationException as ex:
             logging.exception('TApplication exception in handler')
             msg_type = TMessageType.EXCEPTION
@@ -6928,12 +6933,14 @@ class deleteBlob_result(object):
     Attributes:
      - aze
      - knf
+     - ise
     """
 
 
-    def __init__(self, aze=None, knf=None,):
+    def __init__(self, aze=None, knf=None, ise=None,):
         self.aze = aze
         self.knf = knf
+        self.ise = ise
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -6956,6 +6963,12 @@ class deleteBlob_result(object):
                     self.knf.read(iprot)
                 else:
                     iprot.skip(ftype)
+            elif fid == 3:
+                if ftype == TType.STRUCT:
+                    self.ise = IllegalStateException()
+                    self.ise.read(iprot)
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -6974,6 +6987,10 @@ class deleteBlob_result(object):
             oprot.writeFieldBegin('knf', TType.STRUCT, 2)
             self.knf.write(oprot)
             oprot.writeFieldEnd()
+        if self.ise is not None:
+            oprot.writeFieldBegin('ise', TType.STRUCT, 3)
+            self.ise.write(oprot)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -6995,6 +7012,7 @@ deleteBlob_result.thrift_spec = (
     None,  # 0
     (1, TType.STRUCT, 'aze', [AuthorizationException, None], None, ),  # 1
     (2, TType.STRUCT, 'knf', [KeyNotFoundException, None], None, ),  # 2
+    (3, TType.STRUCT, 'ise', [IllegalStateException, None], None, ),  # 3
 )
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 48d2886..0d7ea12 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -1921,6 +1921,67 @@ class KeyNotFoundException(TException):
         return not (self == other)
 
 
+class IllegalStateException(TException):
+    """
+    Attributes:
+     - msg
+    """
+
+
+    def __init__(self, msg=None,):
+        self.msg = msg
+
+    def read(self, iprot):
+        if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
+            iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec])
+            return
+        iprot.readStructBegin()
+        while True:
+            (fname, ftype, fid) = iprot.readFieldBegin()
+            if ftype == TType.STOP:
+                break
+            if fid == 1:
+                if ftype == TType.STRING:
+                    self.msg = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
+                else:
+                    iprot.skip(ftype)
+            else:
+                iprot.skip(ftype)
+            iprot.readFieldEnd()
+        iprot.readStructEnd()
+
+    def write(self, oprot):
+        if oprot._fast_encode is not None and self.thrift_spec is not None:
+            oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec]))
+            return
+        oprot.writeStructBegin('IllegalStateException')
+        if self.msg is not None:
+            oprot.writeFieldBegin('msg', TType.STRING, 1)
+            oprot.writeString(self.msg.encode('utf-8') if sys.version_info[0] == 2 else self.msg)
+            oprot.writeFieldEnd()
+        oprot.writeFieldStop()
+        oprot.writeStructEnd()
+
+    def validate(self):
+        if self.msg is None:
+            raise TProtocolException(message='Required field msg is unset!')
+        return
+
+    def __str__(self):
+        return repr(self)
+
+    def __repr__(self):
+        L = ['%s=%r' % (key, value)
+             for key, value in self.__dict__.items()]
+        return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+    def __ne__(self, other):
+        return not (self == other)
+
+
 class KeyAlreadyExistsException(TException):
     """
     Attributes:
@@ -10201,6 +10262,11 @@ KeyNotFoundException.thrift_spec = (
     None,  # 0
     (1, TType.STRING, 'msg', 'UTF8', None, ),  # 1
 )
+all_structs.append(IllegalStateException)
+IllegalStateException.thrift_spec = (
+    None,  # 0
+    (1, TType.STRING, 'msg', 'UTF8', None, ),  # 1
+)
 all_structs.append(KeyAlreadyExistsException)
 KeyAlreadyExistsException.thrift_spec = (
     None,  # 0

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index fefc6db..724c5b4 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -153,6 +153,10 @@ exception KeyNotFoundException {
   1: required string msg;
 }
 
+exception IllegalStateException {
+  1: required string msg;
+}
+
 exception KeyAlreadyExistsException {
   1: required string msg;
 }
@@ -744,7 +748,7 @@ service Nimbus {
   void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
   BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
   binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze);
-  void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf, 3: IllegalStateException ise);
   ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning
   i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
   i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);

http://git-wip-us.apache.org/repos/asf/storm/blob/733d58cd/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e19d0ef..3c16745 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -87,6 +87,7 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.ExecutorSummary;
 import org.apache.storm.generated.GetInfoOptions;
+import org.apache.storm.generated.IllegalStateException;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
@@ -202,6 +203,7 @@ import org.apache.storm.utils.Utils.UptimeComputer;
 import org.apache.storm.utils.VersionInfo;
 import org.apache.storm.utils.WrappedAlreadyAliveException;
 import org.apache.storm.utils.WrappedAuthorizationException;
+import org.apache.storm.utils.WrappedIllegalStateException;
 import org.apache.storm.utils.WrappedInvalidTopologyException;
 import org.apache.storm.utils.WrappedNotAliveException;
 import org.apache.storm.validation.ConfigValidation;
@@ -820,10 +822,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
 
+    /**
+     * NOTE: this can return false when a topology has just been activated.  The topology may still be
+     * in the STORMS_SUBTREE.
+     */
     private static boolean isTopologyActive(IStormClusterState state, String topoName) {
         return state.getTopoId(topoName).isPresent();
     }
 
+    private static boolean isTopologyActiveOrActivating(IStormClusterState state, String topoName) {
+        return isTopologyActive(state, topoName) || state.activeStorms().contains(topoName);
+    }
+
     private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc)
         throws NotAliveException, AuthorizationException, IOException {
         try {
@@ -2672,7 +2682,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 summary.set_assigned_memoffheap(resources.getAssignedMemOffHeap());
                 summary.set_assigned_cpu(resources.getAssignedCpu());
             }
-            summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId)));
+            try {
+                summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId)));
+            } catch (KeyNotFoundException e) {
+                // This could fail if a blob gets deleted by mistake.  Don't crash nimbus.
+                LOG.error("Unable to find blob entry", e);
+            }
             topologySummaries.add(summary);
         }
 
@@ -3555,8 +3570,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     @Override
-    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
+    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, TException {
         try {
+            String topoName = ConfigUtils.getIdFromBlobKey(key);
+            if (topoName != null) {
+                if (isTopologyActiveOrActivating(stormClusterState, topoName)) {
+                    String message = "Attempting to delete blob " + key + " from under active topology " + topoName;
+                    LOG.warn(message);
+                    throw new WrappedIllegalStateException(message);
+                }
+            }
             blobStore.deleteBlob(key, getSubject());
             LOG.info("Deleted blob for key {}", key);
         } catch (Exception e) {


[2/2] storm git commit: Merge branch 'agresch_blob_delete' of https://github.com/agresch/storm into STORM-3117

Posted by bo...@apache.org.
Merge branch 'agresch_blob_delete' of https://github.com/agresch/storm into STORM-3117

STORM-3117: prevent deleting blobs while topologies still active

This closes #2732


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/21bb1388
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/21bb1388
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/21bb1388

Branch: refs/heads/master
Commit: 21bb1388414d373572779289edc785c7e5aa52aa
Parents: 950e527 733d58c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Jun 25 13:33:30 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Jun 25 13:33:30 2018 -0500

----------------------------------------------------------------------
 .../storm/generated/IllegalStateException.java  | 380 +++++++++++++++++++
 .../jvm/org/apache/storm/generated/Nimbus.java  | 130 ++++++-
 .../utils/WrappedIllegalStateException.java     |  32 ++
 storm-client/src/py/storm/Nimbus.py             |  20 +-
 storm-client/src/py/storm/ttypes.py             |  66 ++++
 storm-client/src/storm.thrift                   |   6 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  27 +-
 7 files changed, 649 insertions(+), 12 deletions(-)
----------------------------------------------------------------------